1 //-----------------------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation.  All rights reserved.
3 //-----------------------------------------------------------------------------
4 
5 namespace System.ServiceModel.Dispatcher
6 {
7     using System.Diagnostics;
8     using System.Runtime;
9     using System.ServiceModel.Channels;
10     using System.ServiceModel.Diagnostics;
11     using System.Transactions;
12 
13     sealed class TransactedBatchContext : IEnlistmentNotification
14     {
15         SharedTransactedBatchContext shared;
16         CommittableTransaction transaction;
17         DateTime commitNotLaterThan;
18         int commits;
19         bool batchFinished;
20         bool inDispatch;
21 
TransactedBatchContext(SharedTransactedBatchContext shared)22         internal TransactedBatchContext(SharedTransactedBatchContext shared)
23         {
24             this.shared = shared;
25             this.transaction = TransactionBehavior.CreateTransaction(shared.IsolationLevel, shared.TransactionTimeout);
26             this.transaction.EnlistVolatile(this, EnlistmentOptions.None);
27             if (shared.TransactionTimeout <= TimeSpan.Zero)
28                 this.commitNotLaterThan = DateTime.MaxValue;
29             else
30                 this.commitNotLaterThan = DateTime.UtcNow + TimeSpan.FromMilliseconds(shared.TransactionTimeout.TotalMilliseconds * 4 / 5);
31             this.commits = 0;
32             this.batchFinished = false;
33             this.inDispatch = false;
34         }
35 
36         internal bool AboutToExpire
37         {
38             get
39             {
40                 return DateTime.UtcNow > this.commitNotLaterThan;
41             }
42         }
43 
44         internal bool IsActive
45         {
46             get
47             {
48                 if (this.batchFinished)
49                     return false;
50 
51                 try
52                 {
53                     return TransactionStatus.Active == this.transaction.TransactionInformation.Status;
54                 }
55                 catch (ObjectDisposedException ex)
56                 {
57                     MsmqDiagnostics.ExpectedException(ex);
58                     return false;
59                 }
60             }
61         }
62 
63         internal bool InDispatch
64         {
65             get { return this.inDispatch; }
66             set
67             {
68                 if (this.inDispatch == value)
69                 {
70                     Fx.Assert("System.ServiceModel.Dispatcher.ChannelHandler.TransactedBatchContext.InDispatch: (inDispatch == value)");
71                 }
72                 this.inDispatch = value;
73                 if (this.inDispatch)
74                     this.shared.DispatchStarted();
75                 else
76                     this.shared.DispatchEnded();
77             }
78         }
79 
80         internal SharedTransactedBatchContext Shared
81         {
82             get { return this.shared; }
83         }
84 
ForceRollback()85         internal void ForceRollback()
86         {
87             try
88             {
89                 this.transaction.Rollback();
90             }
91             catch (ObjectDisposedException ex)
92             {
93                 MsmqDiagnostics.ExpectedException(ex);
94             }
95             catch (TransactionException ex)
96             {
97                 MsmqDiagnostics.ExpectedException(ex);
98             }
99 
100             this.batchFinished = true;
101         }
102 
ForceCommit()103         internal void ForceCommit()
104         {
105             try
106             {
107                 this.transaction.Commit();
108             }
109             catch (ObjectDisposedException ex)
110             {
111                 MsmqDiagnostics.ExpectedException(ex);
112             }
113             catch (TransactionException ex)
114             {
115                 MsmqDiagnostics.ExpectedException(ex);
116             }
117 
118             this.batchFinished = true;
119         }
120 
Complete()121         internal void Complete()
122         {
123             ++this.commits;
124 
125             if (this.commits >= this.shared.CurrentBatchSize || DateTime.UtcNow >= this.commitNotLaterThan)
126             {
127                 ForceCommit();
128             }
129         }
130 
IEnlistmentNotification.Prepare(PreparingEnlistment preparingEnlistment)131         void IEnlistmentNotification.Prepare(PreparingEnlistment preparingEnlistment)
132         {
133             preparingEnlistment.Prepared();
134         }
135 
IEnlistmentNotification.Commit(Enlistment enlistment)136         void IEnlistmentNotification.Commit(Enlistment enlistment)
137         {
138             this.shared.ReportCommit();
139             this.shared.BatchDone();
140             enlistment.Done();
141         }
142 
IEnlistmentNotification.Rollback(Enlistment enlistment)143         void IEnlistmentNotification.Rollback(Enlistment enlistment)
144         {
145             this.shared.ReportAbort();
146             this.shared.BatchDone();
147             enlistment.Done();
148         }
149 
IEnlistmentNotification.InDoubt(Enlistment enlistment)150         void IEnlistmentNotification.InDoubt(Enlistment enlistment)
151         {
152             this.shared.ReportAbort();
153             this.shared.BatchDone();
154             enlistment.Done();
155         }
156 
157         internal Transaction Transaction
158         {
159             get { return this.transaction; }
160         }
161     }
162 
163     sealed class SharedTransactedBatchContext
164     {
165         readonly int maxBatchSize;
166         readonly int maxConcurrentBatches;
167         readonly IsolationLevel isolationLevel;
168         readonly TimeSpan txTimeout;
169         int currentBatchSize;
170         int currentConcurrentBatches;
171         int currentConcurrentDispatches;
172         int successfullCommits;
173         object receiveLock = new object();
174         object thisLock = new object();
175         bool isBatching;
176         ChannelHandler handler;
177 
SharedTransactedBatchContext(ChannelHandler handler, ChannelDispatcher dispatcher, int maxConcurrentBatches)178         internal SharedTransactedBatchContext(ChannelHandler handler, ChannelDispatcher dispatcher, int maxConcurrentBatches)
179         {
180             this.handler = handler;
181             this.maxBatchSize = dispatcher.MaxTransactedBatchSize;
182             this.maxConcurrentBatches = maxConcurrentBatches;
183             this.currentBatchSize = dispatcher.MaxTransactedBatchSize;
184             this.currentConcurrentBatches = 0;
185             this.currentConcurrentDispatches = 0;
186             this.successfullCommits = 0;
187             this.isBatching = true;
188             this.isolationLevel = dispatcher.TransactionIsolationLevel;
189             this.txTimeout = TransactionBehavior.NormalizeTimeout(dispatcher.TransactionTimeout);
190             BatchingStateChanged(this.isBatching);
191         }
192 
CreateTransactedBatchContext()193         internal TransactedBatchContext CreateTransactedBatchContext()
194         {
195             lock (thisLock)
196             {
197                 TransactedBatchContext context = new TransactedBatchContext(this);
198                 ++this.currentConcurrentBatches;
199                 return context;
200             }
201         }
202 
DispatchStarted()203         internal void DispatchStarted()
204         {
205             lock (thisLock)
206             {
207                 ++this.currentConcurrentDispatches;
208                 if (this.currentConcurrentDispatches == this.currentConcurrentBatches && this.currentConcurrentBatches < this.maxConcurrentBatches)
209                 {
210                     TransactedBatchContext context = new TransactedBatchContext(this);
211                     ++this.currentConcurrentBatches;
212                     ChannelHandler newHandler = new ChannelHandler(this.handler, context);
213                     ChannelHandler.Register(newHandler);
214                 }
215             }
216         }
217 
DispatchEnded()218         internal void DispatchEnded()
219         {
220             lock (thisLock)
221             {
222                 --this.currentConcurrentDispatches;
223                 if (this.currentConcurrentDispatches < 0)
224                 {
225                     Fx.Assert("System.ServiceModel.Dispatcher.ChannelHandler.SharedTransactedBatchContext.BatchDone: (currentConcurrentDispatches < 0)");
226                 }
227             }
228         }
229 
BatchDone()230         internal void BatchDone()
231         {
232             lock (thisLock)
233             {
234                 --this.currentConcurrentBatches;
235                 if (this.currentConcurrentBatches < 0)
236                 {
237                     Fx.Assert("System.ServiceModel.Dispatcher.ChannelHandler.SharedTransactedBatchContext.BatchDone: (currentConcurrentBatches < 0)");
238                 }
239             }
240         }
241 
242         internal int CurrentBatchSize
243         {
244             get
245             {
246                 lock (thisLock)
247                 {
248                     return this.currentBatchSize;
249                 }
250             }
251         }
252 
253         internal IsolationLevel IsolationLevel
254         {
255             get
256             {
257                 return this.isolationLevel;
258             }
259         }
260 
261         internal TimeSpan TransactionTimeout
262         {
263             get
264             {
265                 return this.txTimeout;
266             }
267         }
268 
ReportAbort()269         internal void ReportAbort()
270         {
271             lock (thisLock)
272             {
273                 if (isBatching)
274                 {
275                     this.successfullCommits = 0;
276                     this.currentBatchSize = 1;
277                     this.isBatching = false;
278                     BatchingStateChanged(this.isBatching);
279                 }
280             }
281         }
282 
ReportCommit()283         internal void ReportCommit()
284         {
285             lock (thisLock)
286             {
287                 if (++this.successfullCommits >= this.maxBatchSize * 2)
288                 {
289                     this.successfullCommits = 0;
290                     if (!isBatching)
291                     {
292                         this.currentBatchSize = this.maxBatchSize;
293                         this.isBatching = true;
294                         BatchingStateChanged(this.isBatching);
295                     }
296                 }
297             }
298         }
299 
BatchingStateChanged(bool batchingNow)300         void BatchingStateChanged(bool batchingNow)
301         {
302             if (DiagnosticUtility.ShouldTraceVerbose)
303             {
304                 TraceUtility.TraceEvent(
305                     TraceEventType.Verbose,
306                     batchingNow ? TraceCode.MsmqEnteredBatch : TraceCode.MsmqLeftBatch,
307                     batchingNow ? SR.GetString(SR.TraceCodeMsmqEnteredBatch) : SR.GetString(SR.TraceCodeMsmqLeftBatch),
308                     null,
309                     null,
310                     null);
311 
312             }
313         }
314 
315         internal object ReceiveLock
316         {
317             get { return this.receiveLock; }
318         }
319     }
320 }
321