1 //------------------------------------------------------------ 2 // Copyright (c) Microsoft Corporation. All rights reserved. 3 //------------------------------------------------------------ 4 5 namespace System.ServiceModel.Channels 6 { 7 using System.Runtime; 8 using System.ServiceModel; 9 using System.Transactions; 10 using SR = System.ServiceModel.SR; 11 using System.Threading; 12 13 sealed class MsmqInputSessionChannel : InputChannel, IInputSessionChannel 14 { 15 IInputSession session; 16 Transaction associatedTx; 17 ReceiveContext sessiongramReceiveContext; 18 bool receiveContextEnabled; 19 bool sessiongramDoomed; 20 21 // count of messages that have been pulled out of the base queue but Complete has not been called on them 22 int incompleteMessageCount; 23 24 // count of messages that have been completed but the transaction has not been committed 25 int uncommittedMessageCount; 26 MsmqInputSessionChannel(MsmqInputSessionChannelListener listener, Transaction associatedTx, ReceiveContext sessiongramReceiveContext)27 public MsmqInputSessionChannel(MsmqInputSessionChannelListener listener, Transaction associatedTx, ReceiveContext sessiongramReceiveContext) 28 : base(listener, new EndpointAddress(listener.Uri)) 29 { 30 this.session = new InputSession(); 31 this.incompleteMessageCount = 0; 32 33 if (sessiongramReceiveContext == null) 34 { 35 this.receiveContextEnabled = false; 36 37 // only enlist if we are running in a non-receive context mode 38 this.associatedTx = associatedTx; 39 this.associatedTx.EnlistVolatile(new TransactionEnlistment(this, this.associatedTx), EnlistmentOptions.None); 40 } 41 else 42 { 43 //ignore the ambient transaction if any 44 this.receiveContextEnabled = true; 45 this.sessiongramReceiveContext = sessiongramReceiveContext; 46 this.sessiongramDoomed = false; 47 } 48 } 49 50 public IInputSession Session 51 { 52 get { return this.session; } 53 } 54 55 int TotalPendingItems 56 { 57 get 58 { 59 return this.InternalPendingItems + this.incompleteMessageCount; 60 } 61 } 62 DetachTransaction(bool aborted)63 void DetachTransaction(bool aborted) 64 { 65 // disassociate the session channel from the current transaction and enlistment 66 this.associatedTx = null; 67 if (aborted) 68 { 69 this.incompleteMessageCount += this.uncommittedMessageCount; 70 } 71 this.uncommittedMessageCount = 0; 72 } 73 AbandonMessage(TimeSpan timeout)74 void AbandonMessage(TimeSpan timeout) 75 { 76 ThrowIfFaulted(); 77 this.sessiongramDoomed = true; 78 } 79 CompleteMessage(TimeSpan timeout)80 void CompleteMessage(TimeSpan timeout) 81 { 82 ThrowIfFaulted(); 83 EnsureReceiveContextTransaction(); 84 85 // the message is now off to transaction land 86 Interlocked.Increment(ref uncommittedMessageCount); 87 Interlocked.Decrement(ref incompleteMessageCount); 88 } 89 Receive()90 public override Message Receive() 91 { 92 return this.Receive(this.DefaultReceiveTimeout); 93 } 94 Receive(TimeSpan timeout)95 public override Message Receive(TimeSpan timeout) 96 { 97 return InputChannel.HelpReceive(this, timeout); 98 } 99 BeginReceive(AsyncCallback callback, object state)100 public override IAsyncResult BeginReceive(AsyncCallback callback, object state) 101 { 102 return this.BeginReceive(this.DefaultReceiveTimeout, callback, state); 103 } 104 BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)105 public override IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) 106 { 107 return InputChannel.HelpBeginReceive(this, timeout, callback, state); 108 } 109 TryReceive(TimeSpan timeout, out Message message)110 public override bool TryReceive(TimeSpan timeout, out Message message) 111 { 112 ThrowIfFaulted(); 113 if (CommunicationState.Closed == this.State || CommunicationState.Closing == this.State) 114 { 115 message = null; 116 return true; 117 } 118 119 // we don't look at the transaction in the receive if receive context is enabled 120 if (!this.receiveContextEnabled) 121 { 122 VerifyTransaction(); 123 } 124 125 bool receiveSuccessful = base.TryReceive(timeout, out message); 126 127 if (receiveSuccessful && message != null && this.receiveContextEnabled) 128 { 129 message.Properties[ReceiveContext.Name] = new MsmqSessionReceiveContext(this); 130 Interlocked.Increment(ref incompleteMessageCount); 131 } 132 133 return receiveSuccessful; 134 } 135 BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)136 public override IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) 137 { 138 ThrowIfFaulted(); 139 if (CommunicationState.Closed == this.State || CommunicationState.Closing == this.State) 140 { 141 return new CompletedAsyncResult<bool, Message>(true, null, callback, state); 142 } 143 // we don't look at the transaction in the receive if receive context is enabled 144 if (!this.receiveContextEnabled) 145 { 146 VerifyTransaction(); 147 } 148 return base.BeginTryReceive(timeout, callback, state); 149 } 150 EndTryReceive(IAsyncResult result, out Message message)151 public override bool EndTryReceive(IAsyncResult result, out Message message) 152 { 153 CompletedAsyncResult<bool, Message> completedResult = result as CompletedAsyncResult<bool, Message>; 154 155 if (null != completedResult) 156 { 157 return CompletedAsyncResult<bool, Message>.End(result, out message); 158 } 159 else 160 { 161 bool receiveSuccessful = base.EndTryReceive(result, out message); 162 if (receiveSuccessful && message != null && this.receiveContextEnabled) 163 { 164 message.Properties[ReceiveContext.Name] = new MsmqSessionReceiveContext(this); 165 Interlocked.Increment(ref incompleteMessageCount); 166 } 167 168 return receiveSuccessful; 169 } 170 } 171 FaultChannel()172 public void FaultChannel() 173 { 174 this.Fault(); 175 } 176 OnCloseReceiveContext(bool isAborting)177 void OnCloseReceiveContext(bool isAborting) 178 { 179 if (isAborting) 180 { 181 // can't do much on Channel.Abort if the transaction had already committed 182 if (this.associatedTx != null) 183 { 184 // Channel.Abort called within the associated transaction 185 Exception e = DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionChannelAbort))); 186 RollbackTransaction(e); 187 } 188 this.sessiongramReceiveContext.Abandon(TimeSpan.MaxValue); 189 } 190 else 191 { 192 if (this.TotalPendingItems > 0) 193 { 194 // no need for rollback, it will happen automatically when this condition is hit in the Prepare() call 195 this.Fault(); 196 this.sessiongramReceiveContext.Abandon(TimeSpan.MaxValue); 197 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionPrematureClose))); 198 } 199 } 200 } 201 OnCloseTransactional(bool isAborting)202 void OnCloseTransactional(bool isAborting) 203 { 204 if (isAborting) 205 { 206 RollbackTransaction(null); 207 } 208 else 209 { 210 VerifyTransaction(); 211 if (this.InternalPendingItems > 0) 212 { 213 RollbackTransaction(null); 214 this.Fault(); 215 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionMessagesNotConsumed))); 216 } 217 } 218 } 219 OnCloseCore(bool isAborting)220 void OnCloseCore(bool isAborting) 221 { 222 if (this.receiveContextEnabled) 223 { 224 OnCloseReceiveContext(isAborting); 225 } 226 else 227 { 228 OnCloseTransactional(isAborting); 229 } 230 } 231 OnAbort()232 protected override void OnAbort() 233 { 234 OnCloseCore(true); 235 base.OnAbort(); 236 } 237 OnClose(TimeSpan timeout)238 protected override void OnClose(TimeSpan timeout) 239 { 240 OnCloseCore(false); 241 base.OnClose(timeout); 242 } 243 OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)244 protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) 245 { 246 OnCloseCore(false); 247 return base.OnBeginClose(timeout, callback, state); 248 } 249 RollbackTransaction(Exception exception)250 void RollbackTransaction(Exception exception) 251 { 252 try 253 { 254 if (TransactionStatus.Active == this.associatedTx.TransactionInformation.Status) 255 this.associatedTx.Rollback(exception); 256 } 257 catch (TransactionAbortedException ex) 258 { 259 MsmqDiagnostics.ExpectedException(ex); 260 } 261 catch (ObjectDisposedException ex) 262 { 263 MsmqDiagnostics.ExpectedException(ex); 264 } 265 } 266 EnsureReceiveContextTransaction()267 void EnsureReceiveContextTransaction() 268 { 269 // if this is the first time we are seeing this transaction in receivecontext enabled mode then enlist and 270 // associate the session channel with this transaction 271 if (Transaction.Current == null) 272 { 273 throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqTransactionRequired))); 274 } 275 276 if (this.associatedTx == null) 277 { 278 this.associatedTx = Transaction.Current; 279 this.associatedTx.EnlistVolatile(new ReceiveContextTransactionEnlistment(this, this.associatedTx, this.sessiongramReceiveContext), 280 EnlistmentOptions.EnlistDuringPrepareRequired); 281 } 282 else 283 { 284 if (this.associatedTx != Transaction.Current) 285 { 286 RollbackTransaction(null); 287 this.Fault(); 288 throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqSameTransactionExpected))); 289 } 290 291 if (TransactionStatus.Active != Transaction.Current.TransactionInformation.Status) 292 { 293 this.Fault(); 294 throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqTransactionNotActive))); 295 } 296 } 297 } 298 VerifyTransaction()299 void VerifyTransaction() 300 { 301 if (this.InternalPendingItems > 0) 302 { 303 if (this.associatedTx != Transaction.Current) 304 { 305 RollbackTransaction(null); 306 this.Fault(); 307 throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqSameTransactionExpected))); 308 } 309 310 if (TransactionStatus.Active != Transaction.Current.TransactionInformation.Status) 311 { 312 RollbackTransaction(null); 313 this.Fault(); 314 throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqTransactionNotActive))); 315 } 316 } 317 } 318 319 class InputSession : IInputSession 320 { 321 string id = "uuid://session-gram/" + Guid.NewGuid().ToString(); 322 323 public string Id 324 { 325 get { return this.id; } 326 } 327 } 328 329 class MsmqSessionReceiveContext : ReceiveContext 330 { 331 MsmqInputSessionChannel channel; 332 MsmqSessionReceiveContext(MsmqInputSessionChannel channel)333 public MsmqSessionReceiveContext(MsmqInputSessionChannel channel) 334 { 335 this.channel = channel; 336 } 337 OnAbandon(TimeSpan timeout)338 protected override void OnAbandon(TimeSpan timeout) 339 { 340 this.channel.AbandonMessage(timeout); 341 } 342 OnBeginAbandon(TimeSpan timeout, AsyncCallback callback, object state)343 protected override IAsyncResult OnBeginAbandon(TimeSpan timeout, AsyncCallback callback, object state) 344 { 345 return SessionReceiveContextAsyncResult.CreateAbandon(this, timeout, callback, state); 346 } 347 OnBeginComplete(TimeSpan timeout, AsyncCallback callback, object state)348 protected override IAsyncResult OnBeginComplete(TimeSpan timeout, AsyncCallback callback, object state) 349 { 350 return SessionReceiveContextAsyncResult.CreateComplete(this, timeout, callback, state); 351 } 352 OnComplete(TimeSpan timeout)353 protected override void OnComplete(TimeSpan timeout) 354 { 355 this.channel.CompleteMessage(timeout); 356 } 357 OnEndAbandon(IAsyncResult result)358 protected override void OnEndAbandon(IAsyncResult result) 359 { 360 SessionReceiveContextAsyncResult.End(result); 361 } 362 OnEndComplete(IAsyncResult result)363 protected override void OnEndComplete(IAsyncResult result) 364 { 365 SessionReceiveContextAsyncResult.End(result); 366 } 367 368 class SessionReceiveContextAsyncResult : AsyncResult 369 { 370 MsmqSessionReceiveContext receiveContext; 371 Transaction completionTransaction; 372 373 TimeoutHelper timeoutHelper; 374 static Action<object> onComplete; 375 static Action<object> onAbandon; 376 SessionReceiveContextAsyncResult(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state, Action<object> target)377 SessionReceiveContextAsyncResult(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state, Action<object> target) 378 : base(callback, state) 379 { 380 this.completionTransaction = Transaction.Current; 381 this.timeoutHelper = new TimeoutHelper(timeout); 382 this.receiveContext = receiveContext; 383 ActionItem.Schedule(target, this); 384 } 385 CreateComplete(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state)386 public static IAsyncResult CreateComplete(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state) 387 { 388 if (onComplete == null) 389 { 390 onComplete = new Action<object>(OnComplete); 391 } 392 return new SessionReceiveContextAsyncResult(receiveContext, timeout, callback, state, onComplete); 393 } 394 CreateAbandon(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state)395 public static IAsyncResult CreateAbandon(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state) 396 { 397 if (onAbandon == null) 398 { 399 onAbandon = new Action<object>(OnAbandon); 400 } 401 return new SessionReceiveContextAsyncResult(receiveContext, timeout, callback, state, onAbandon); 402 } 403 OnComplete(object parameter)404 static void OnComplete(object parameter) 405 { 406 SessionReceiveContextAsyncResult result = parameter as SessionReceiveContextAsyncResult; 407 Transaction savedTransaction = Transaction.Current; 408 Transaction.Current = result.completionTransaction; 409 410 try 411 { 412 Exception completionException = null; 413 try 414 { 415 result.receiveContext.OnComplete(result.timeoutHelper.RemainingTime()); 416 } 417 catch (Exception e) 418 { 419 if (Fx.IsFatal(e)) 420 { 421 throw; 422 } 423 424 completionException = e; 425 } 426 result.Complete(false, completionException); 427 } 428 finally 429 { 430 Transaction.Current = savedTransaction; 431 } 432 } 433 OnAbandon(object parameter)434 static void OnAbandon(object parameter) 435 { 436 SessionReceiveContextAsyncResult result = parameter as SessionReceiveContextAsyncResult; 437 Exception completionException = null; 438 try 439 { 440 result.receiveContext.OnAbandon(result.timeoutHelper.RemainingTime()); 441 } 442 catch (Exception e) 443 { 444 if (Fx.IsFatal(e)) 445 throw; 446 completionException = e; 447 } 448 result.Complete(false, completionException); 449 } 450 End(IAsyncResult result)451 public static void End(IAsyncResult result) 452 { 453 AsyncResult.End<SessionReceiveContextAsyncResult>(result); 454 } 455 } 456 } 457 458 class ReceiveContextTransactionEnlistment : IEnlistmentNotification 459 { 460 MsmqInputSessionChannel channel; 461 Transaction transaction; 462 ReceiveContext sessiongramReceiveContext; 463 ReceiveContextTransactionEnlistment(MsmqInputSessionChannel channel, Transaction transaction, ReceiveContext receiveContext)464 public ReceiveContextTransactionEnlistment(MsmqInputSessionChannel channel, Transaction transaction, ReceiveContext receiveContext) 465 { 466 this.channel = channel; 467 this.transaction = transaction; 468 this.sessiongramReceiveContext = receiveContext; 469 } 470 Prepare(PreparingEnlistment preparingEnlistment)471 public void Prepare(PreparingEnlistment preparingEnlistment) 472 { 473 // Abort if this happens before all messges are consumed 474 // Note that we are not placing any restriction on the channel state 475 if (this.channel.TotalPendingItems > 0 || this.channel.sessiongramDoomed) 476 { 477 Exception e = DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionChannelHasPendingItems))); 478 this.sessiongramReceiveContext.Abandon(TimeSpan.MaxValue); 479 preparingEnlistment.ForceRollback(e); 480 this.channel.Fault(); 481 } 482 else 483 { 484 Transaction savedTransaction = Transaction.Current; 485 // complete the sessiongram message within this transaction 486 try 487 { 488 Transaction.Current = this.transaction; 489 490 try 491 { 492 this.sessiongramReceiveContext.Complete(TimeSpan.MaxValue); 493 preparingEnlistment.Done(); 494 } 495 catch (MsmqException msmqex) 496 { 497 preparingEnlistment.ForceRollback(msmqex); 498 this.channel.Fault(); 499 } 500 } 501 finally 502 { 503 Transaction.Current = savedTransaction; 504 } 505 } 506 } 507 Commit(Enlistment enlistment)508 public void Commit(Enlistment enlistment) 509 { 510 this.channel.DetachTransaction(false); 511 enlistment.Done(); 512 } 513 Rollback(Enlistment enlistment)514 public void Rollback(Enlistment enlistment) 515 { 516 this.channel.DetachTransaction(true); 517 enlistment.Done(); 518 } 519 InDoubt(Enlistment enlistment)520 public void InDoubt(Enlistment enlistment) 521 { 522 enlistment.Done(); 523 } 524 } 525 526 class TransactionEnlistment : IEnlistmentNotification 527 { 528 MsmqInputSessionChannel channel; 529 Transaction transaction; 530 TransactionEnlistment(MsmqInputSessionChannel channel, Transaction transaction)531 public TransactionEnlistment(MsmqInputSessionChannel channel, Transaction transaction) 532 { 533 this.channel = channel; 534 this.transaction = transaction; 535 } 536 Prepare(PreparingEnlistment preparingEnlistment)537 public void Prepare(PreparingEnlistment preparingEnlistment) 538 { 539 // Abort if this happens before all messges are consumed 540 if (this.channel.State == CommunicationState.Opened && this.channel.InternalPendingItems > 0) 541 { 542 Exception e = DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionChannelsMustBeClosed))); 543 preparingEnlistment.ForceRollback(e); 544 this.channel.Fault(); 545 } 546 else 547 { 548 preparingEnlistment.Done(); 549 } 550 } 551 Commit(Enlistment enlistment)552 public void Commit(Enlistment enlistment) 553 { 554 enlistment.Done(); 555 } 556 Rollback(Enlistment enlistment)557 public void Rollback(Enlistment enlistment) 558 { 559 channel.Fault(); 560 enlistment.Done(); 561 } 562 InDoubt(Enlistment enlistment)563 public void InDoubt(Enlistment enlistment) 564 { 565 enlistment.Done(); 566 } 567 } 568 } 569 } 570