1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation.  All rights reserved.
3 //------------------------------------------------------------
4 
5 namespace System.ServiceModel.Channels
6 {
7     using System.Runtime;
8     using System.ServiceModel;
9     using System.Transactions;
10     using SR = System.ServiceModel.SR;
11     using System.Threading;
12 
13     sealed class MsmqInputSessionChannel : InputChannel, IInputSessionChannel
14     {
15         IInputSession session;
16         Transaction associatedTx;
17         ReceiveContext sessiongramReceiveContext;
18         bool receiveContextEnabled;
19         bool sessiongramDoomed;
20 
21         // count of messages that have been pulled out of the base queue but Complete has not been called on them
22         int incompleteMessageCount;
23 
24         // count of messages that have been completed but the transaction has not been committed
25         int uncommittedMessageCount;
26 
MsmqInputSessionChannel(MsmqInputSessionChannelListener listener, Transaction associatedTx, ReceiveContext sessiongramReceiveContext)27         public MsmqInputSessionChannel(MsmqInputSessionChannelListener listener, Transaction associatedTx, ReceiveContext sessiongramReceiveContext)
28             : base(listener, new EndpointAddress(listener.Uri))
29         {
30             this.session = new InputSession();
31             this.incompleteMessageCount = 0;
32 
33             if (sessiongramReceiveContext == null)
34             {
35                 this.receiveContextEnabled = false;
36 
37                 // only enlist if we are running in a non-receive context mode
38                 this.associatedTx = associatedTx;
39                 this.associatedTx.EnlistVolatile(new TransactionEnlistment(this, this.associatedTx), EnlistmentOptions.None);
40             }
41             else
42             {
43                 //ignore the ambient transaction if any
44                 this.receiveContextEnabled = true;
45                 this.sessiongramReceiveContext = sessiongramReceiveContext;
46                 this.sessiongramDoomed = false;
47             }
48         }
49 
50         public IInputSession Session
51         {
52             get { return this.session; }
53         }
54 
55         int TotalPendingItems
56         {
57             get
58             {
59                 return this.InternalPendingItems + this.incompleteMessageCount;
60             }
61         }
62 
DetachTransaction(bool aborted)63         void DetachTransaction(bool aborted)
64         {
65             // disassociate the session channel from the current transaction and enlistment
66             this.associatedTx = null;
67             if (aborted)
68             {
69                 this.incompleteMessageCount += this.uncommittedMessageCount;
70             }
71             this.uncommittedMessageCount = 0;
72         }
73 
AbandonMessage(TimeSpan timeout)74         void AbandonMessage(TimeSpan timeout)
75         {
76             ThrowIfFaulted();
77             this.sessiongramDoomed = true;
78         }
79 
CompleteMessage(TimeSpan timeout)80         void CompleteMessage(TimeSpan timeout)
81         {
82             ThrowIfFaulted();
83             EnsureReceiveContextTransaction();
84 
85             // the message is now off to transaction land
86             Interlocked.Increment(ref uncommittedMessageCount);
87             Interlocked.Decrement(ref incompleteMessageCount);
88         }
89 
Receive()90         public override Message Receive()
91         {
92             return this.Receive(this.DefaultReceiveTimeout);
93         }
94 
Receive(TimeSpan timeout)95         public override Message Receive(TimeSpan timeout)
96         {
97             return InputChannel.HelpReceive(this, timeout);
98         }
99 
BeginReceive(AsyncCallback callback, object state)100         public override IAsyncResult BeginReceive(AsyncCallback callback, object state)
101         {
102             return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
103         }
104 
BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)105         public override IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
106         {
107             return InputChannel.HelpBeginReceive(this, timeout, callback, state);
108         }
109 
TryReceive(TimeSpan timeout, out Message message)110         public override bool TryReceive(TimeSpan timeout, out Message message)
111         {
112             ThrowIfFaulted();
113             if (CommunicationState.Closed == this.State || CommunicationState.Closing == this.State)
114             {
115                 message = null;
116                 return true;
117             }
118 
119             // we don't look at the transaction in the receive if receive context is enabled
120             if (!this.receiveContextEnabled)
121             {
122                 VerifyTransaction();
123             }
124 
125             bool receiveSuccessful = base.TryReceive(timeout, out message);
126 
127             if (receiveSuccessful && message != null && this.receiveContextEnabled)
128             {
129                 message.Properties[ReceiveContext.Name] = new MsmqSessionReceiveContext(this);
130                 Interlocked.Increment(ref incompleteMessageCount);
131             }
132 
133             return receiveSuccessful;
134         }
135 
BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)136         public override IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
137         {
138             ThrowIfFaulted();
139             if (CommunicationState.Closed == this.State || CommunicationState.Closing == this.State)
140             {
141                 return new CompletedAsyncResult<bool, Message>(true, null, callback, state);
142             }
143             // we don't look at the transaction in the receive if receive context is enabled
144             if (!this.receiveContextEnabled)
145             {
146                 VerifyTransaction();
147             }
148             return base.BeginTryReceive(timeout, callback, state);
149         }
150 
EndTryReceive(IAsyncResult result, out Message message)151         public override bool EndTryReceive(IAsyncResult result, out Message message)
152         {
153             CompletedAsyncResult<bool, Message> completedResult = result as CompletedAsyncResult<bool, Message>;
154 
155             if (null != completedResult)
156             {
157                 return CompletedAsyncResult<bool, Message>.End(result, out message);
158             }
159             else
160             {
161                 bool receiveSuccessful = base.EndTryReceive(result, out message);
162                 if (receiveSuccessful && message != null && this.receiveContextEnabled)
163                 {
164                     message.Properties[ReceiveContext.Name] = new MsmqSessionReceiveContext(this);
165                     Interlocked.Increment(ref incompleteMessageCount);
166                 }
167 
168                 return receiveSuccessful;
169             }
170         }
171 
FaultChannel()172         public void FaultChannel()
173         {
174             this.Fault();
175         }
176 
OnCloseReceiveContext(bool isAborting)177         void OnCloseReceiveContext(bool isAborting)
178         {
179             if (isAborting)
180             {
181                 // can't do much on Channel.Abort if the transaction had already committed
182                 if (this.associatedTx != null)
183                 {
184                     // Channel.Abort called within the associated transaction
185                     Exception e = DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionChannelAbort)));
186                     RollbackTransaction(e);
187                 }
188                 this.sessiongramReceiveContext.Abandon(TimeSpan.MaxValue);
189             }
190             else
191             {
192                 if (this.TotalPendingItems > 0)
193                 {
194                     // no need for rollback, it will happen automatically when this condition is hit in the Prepare() call
195                     this.Fault();
196                     this.sessiongramReceiveContext.Abandon(TimeSpan.MaxValue);
197                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionPrematureClose)));
198                 }
199             }
200         }
201 
OnCloseTransactional(bool isAborting)202         void OnCloseTransactional(bool isAborting)
203         {
204             if (isAborting)
205             {
206                 RollbackTransaction(null);
207             }
208             else
209             {
210                 VerifyTransaction();
211                 if (this.InternalPendingItems > 0)
212                 {
213                     RollbackTransaction(null);
214                     this.Fault();
215                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionMessagesNotConsumed)));
216                 }
217             }
218         }
219 
OnCloseCore(bool isAborting)220         void OnCloseCore(bool isAborting)
221         {
222             if (this.receiveContextEnabled)
223             {
224                 OnCloseReceiveContext(isAborting);
225             }
226             else
227             {
228                 OnCloseTransactional(isAborting);
229             }
230         }
231 
OnAbort()232         protected override void OnAbort()
233         {
234             OnCloseCore(true);
235             base.OnAbort();
236         }
237 
OnClose(TimeSpan timeout)238         protected override void OnClose(TimeSpan timeout)
239         {
240             OnCloseCore(false);
241             base.OnClose(timeout);
242         }
243 
OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)244         protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
245         {
246             OnCloseCore(false);
247             return base.OnBeginClose(timeout, callback, state);
248         }
249 
RollbackTransaction(Exception exception)250         void RollbackTransaction(Exception exception)
251         {
252             try
253             {
254                 if (TransactionStatus.Active == this.associatedTx.TransactionInformation.Status)
255                     this.associatedTx.Rollback(exception);
256             }
257             catch (TransactionAbortedException ex)
258             {
259                 MsmqDiagnostics.ExpectedException(ex);
260             }
261             catch (ObjectDisposedException ex)
262             {
263                 MsmqDiagnostics.ExpectedException(ex);
264             }
265         }
266 
EnsureReceiveContextTransaction()267         void EnsureReceiveContextTransaction()
268         {
269             // if this is the first time we are seeing this transaction in receivecontext enabled mode then enlist and
270             // associate the session channel with this transaction
271             if (Transaction.Current == null)
272             {
273                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqTransactionRequired)));
274             }
275 
276             if (this.associatedTx == null)
277             {
278                 this.associatedTx = Transaction.Current;
279                 this.associatedTx.EnlistVolatile(new ReceiveContextTransactionEnlistment(this, this.associatedTx, this.sessiongramReceiveContext),
280                     EnlistmentOptions.EnlistDuringPrepareRequired);
281             }
282             else
283             {
284                 if (this.associatedTx != Transaction.Current)
285                 {
286                     RollbackTransaction(null);
287                     this.Fault();
288                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqSameTransactionExpected)));
289                 }
290 
291                 if (TransactionStatus.Active != Transaction.Current.TransactionInformation.Status)
292                 {
293                     this.Fault();
294                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqTransactionNotActive)));
295                 }
296             }
297         }
298 
VerifyTransaction()299         void VerifyTransaction()
300         {
301             if (this.InternalPendingItems > 0)
302             {
303                 if (this.associatedTx != Transaction.Current)
304                 {
305                     RollbackTransaction(null);
306                     this.Fault();
307                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqSameTransactionExpected)));
308                 }
309 
310                 if (TransactionStatus.Active != Transaction.Current.TransactionInformation.Status)
311                 {
312                     RollbackTransaction(null);
313                     this.Fault();
314                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqTransactionNotActive)));
315                 }
316             }
317         }
318 
319         class InputSession : IInputSession
320         {
321             string id = "uuid://session-gram/" + Guid.NewGuid().ToString();
322 
323             public string Id
324             {
325                 get { return this.id; }
326             }
327         }
328 
329         class MsmqSessionReceiveContext : ReceiveContext
330         {
331             MsmqInputSessionChannel channel;
332 
MsmqSessionReceiveContext(MsmqInputSessionChannel channel)333             public MsmqSessionReceiveContext(MsmqInputSessionChannel channel)
334             {
335                 this.channel = channel;
336             }
337 
OnAbandon(TimeSpan timeout)338             protected override void OnAbandon(TimeSpan timeout)
339             {
340                 this.channel.AbandonMessage(timeout);
341             }
342 
OnBeginAbandon(TimeSpan timeout, AsyncCallback callback, object state)343             protected override IAsyncResult OnBeginAbandon(TimeSpan timeout, AsyncCallback callback, object state)
344             {
345                 return SessionReceiveContextAsyncResult.CreateAbandon(this, timeout, callback, state);
346             }
347 
OnBeginComplete(TimeSpan timeout, AsyncCallback callback, object state)348             protected override IAsyncResult OnBeginComplete(TimeSpan timeout, AsyncCallback callback, object state)
349             {
350                 return SessionReceiveContextAsyncResult.CreateComplete(this, timeout, callback, state);
351             }
352 
OnComplete(TimeSpan timeout)353             protected override void OnComplete(TimeSpan timeout)
354             {
355                 this.channel.CompleteMessage(timeout);
356             }
357 
OnEndAbandon(IAsyncResult result)358             protected override void OnEndAbandon(IAsyncResult result)
359             {
360                 SessionReceiveContextAsyncResult.End(result);
361             }
362 
OnEndComplete(IAsyncResult result)363             protected override void OnEndComplete(IAsyncResult result)
364             {
365                 SessionReceiveContextAsyncResult.End(result);
366             }
367 
368             class SessionReceiveContextAsyncResult : AsyncResult
369             {
370                 MsmqSessionReceiveContext receiveContext;
371                 Transaction completionTransaction;
372 
373                 TimeoutHelper timeoutHelper;
374                 static Action<object> onComplete;
375                 static Action<object> onAbandon;
376 
SessionReceiveContextAsyncResult(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state, Action<object> target)377                 SessionReceiveContextAsyncResult(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state, Action<object> target)
378                     : base(callback, state)
379                 {
380                     this.completionTransaction = Transaction.Current;
381                     this.timeoutHelper = new TimeoutHelper(timeout);
382                     this.receiveContext = receiveContext;
383                     ActionItem.Schedule(target, this);
384                 }
385 
CreateComplete(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state)386                 public static IAsyncResult CreateComplete(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state)
387                 {
388                     if (onComplete == null)
389                     {
390                         onComplete = new Action<object>(OnComplete);
391                     }
392                     return new SessionReceiveContextAsyncResult(receiveContext, timeout, callback, state, onComplete);
393                 }
394 
CreateAbandon(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state)395                 public static IAsyncResult CreateAbandon(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state)
396                 {
397                     if (onAbandon == null)
398                     {
399                         onAbandon = new Action<object>(OnAbandon);
400                     }
401                     return new SessionReceiveContextAsyncResult(receiveContext, timeout, callback, state, onAbandon);
402                 }
403 
OnComplete(object parameter)404                 static void OnComplete(object parameter)
405                 {
406                     SessionReceiveContextAsyncResult result = parameter as SessionReceiveContextAsyncResult;
407                     Transaction savedTransaction = Transaction.Current;
408                     Transaction.Current = result.completionTransaction;
409 
410                     try
411                     {
412                         Exception completionException = null;
413                         try
414                         {
415                             result.receiveContext.OnComplete(result.timeoutHelper.RemainingTime());
416                         }
417                         catch (Exception e)
418                         {
419                             if (Fx.IsFatal(e))
420                             {
421                                 throw;
422                             }
423 
424                             completionException = e;
425                         }
426                         result.Complete(false, completionException);
427                     }
428                     finally
429                     {
430                         Transaction.Current = savedTransaction;
431                     }
432                 }
433 
OnAbandon(object parameter)434                 static void OnAbandon(object parameter)
435                 {
436                     SessionReceiveContextAsyncResult result = parameter as SessionReceiveContextAsyncResult;
437                     Exception completionException = null;
438                     try
439                     {
440                         result.receiveContext.OnAbandon(result.timeoutHelper.RemainingTime());
441                     }
442                     catch (Exception e)
443                     {
444                         if (Fx.IsFatal(e))
445                             throw;
446                         completionException = e;
447                     }
448                     result.Complete(false, completionException);
449                 }
450 
End(IAsyncResult result)451                 public static void End(IAsyncResult result)
452                 {
453                     AsyncResult.End<SessionReceiveContextAsyncResult>(result);
454                 }
455             }
456         }
457 
458         class ReceiveContextTransactionEnlistment : IEnlistmentNotification
459         {
460             MsmqInputSessionChannel channel;
461             Transaction transaction;
462             ReceiveContext sessiongramReceiveContext;
463 
ReceiveContextTransactionEnlistment(MsmqInputSessionChannel channel, Transaction transaction, ReceiveContext receiveContext)464             public ReceiveContextTransactionEnlistment(MsmqInputSessionChannel channel, Transaction transaction, ReceiveContext receiveContext)
465             {
466                 this.channel = channel;
467                 this.transaction = transaction;
468                 this.sessiongramReceiveContext = receiveContext;
469             }
470 
Prepare(PreparingEnlistment preparingEnlistment)471             public void Prepare(PreparingEnlistment preparingEnlistment)
472             {
473                 // Abort if this happens before all messges are consumed
474                 // Note that we are not placing any restriction on the channel state
475                 if (this.channel.TotalPendingItems > 0 || this.channel.sessiongramDoomed)
476                 {
477                     Exception e = DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionChannelHasPendingItems)));
478                     this.sessiongramReceiveContext.Abandon(TimeSpan.MaxValue);
479                     preparingEnlistment.ForceRollback(e);
480                     this.channel.Fault();
481                 }
482                 else
483                 {
484                     Transaction savedTransaction = Transaction.Current;
485                     // complete the sessiongram message within this transaction
486                     try
487                     {
488                         Transaction.Current = this.transaction;
489 
490                         try
491                         {
492                             this.sessiongramReceiveContext.Complete(TimeSpan.MaxValue);
493                             preparingEnlistment.Done();
494                         }
495                         catch (MsmqException msmqex)
496                         {
497                             preparingEnlistment.ForceRollback(msmqex);
498                             this.channel.Fault();
499                         }
500                     }
501                     finally
502                     {
503                         Transaction.Current = savedTransaction;
504                     }
505                 }
506             }
507 
Commit(Enlistment enlistment)508             public void Commit(Enlistment enlistment)
509             {
510                 this.channel.DetachTransaction(false);
511                 enlistment.Done();
512             }
513 
Rollback(Enlistment enlistment)514             public void Rollback(Enlistment enlistment)
515             {
516                 this.channel.DetachTransaction(true);
517                 enlistment.Done();
518             }
519 
InDoubt(Enlistment enlistment)520             public void InDoubt(Enlistment enlistment)
521             {
522                 enlistment.Done();
523             }
524         }
525 
526         class TransactionEnlistment : IEnlistmentNotification
527         {
528             MsmqInputSessionChannel channel;
529             Transaction transaction;
530 
TransactionEnlistment(MsmqInputSessionChannel channel, Transaction transaction)531             public TransactionEnlistment(MsmqInputSessionChannel channel, Transaction transaction)
532             {
533                 this.channel = channel;
534                 this.transaction = transaction;
535             }
536 
Prepare(PreparingEnlistment preparingEnlistment)537             public void Prepare(PreparingEnlistment preparingEnlistment)
538             {
539                 // Abort if this happens before all messges are consumed
540                 if (this.channel.State == CommunicationState.Opened && this.channel.InternalPendingItems > 0)
541                 {
542                     Exception e = DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionChannelsMustBeClosed)));
543                     preparingEnlistment.ForceRollback(e);
544                     this.channel.Fault();
545                 }
546                 else
547                 {
548                     preparingEnlistment.Done();
549                 }
550             }
551 
Commit(Enlistment enlistment)552             public void Commit(Enlistment enlistment)
553             {
554                 enlistment.Done();
555             }
556 
Rollback(Enlistment enlistment)557             public void Rollback(Enlistment enlistment)
558             {
559                 channel.Fault();
560                 enlistment.Done();
561             }
562 
InDoubt(Enlistment enlistment)563             public void InDoubt(Enlistment enlistment)
564             {
565                 enlistment.Done();
566             }
567         }
568     }
569 }
570