1 using System;
2 using System.IO;
3 using System.Transactions;
4 using System.Diagnostics;
5 using System.Data;
6 using System.Data.Common;
7 using System.Data.SqlTypes;
8 using System.Data.SqlClient;
9 using System.Runtime.Serialization;
10 using System.Runtime.Serialization.Formatters.Binary;
11 using System.Collections;
12 using System.Collections.Generic;
13 using System.Collections.Specialized;
14 using System.Configuration;
15 using System.Text.RegularExpressions;
16 using System.Security.Permissions;
17 using System.Threading;
18 
19 using System.Workflow.Runtime.Hosting;
20 using System.Workflow.Runtime;
21 using System.Workflow.ComponentModel;
22 using System.Globalization;
23 
24 namespace System.Workflow.Runtime.Hosting
25 {
26     #region PersistenceDBAccessor
27 
28 
29     internal sealed class PendingWorkItem
30     {
31         public enum ItemType { Instance, CompletedScope, ActivationComplete };
32 
33         public ItemType Type;
34         public Guid InstanceId;
35         public Guid StateId;
36         public Byte[] SerializedActivity;
37         public int Status;
38         public int Blocked;
39         public string Info;
40         public bool Unlocked;
41         public SqlDateTime NextTimer;
42     }
43 
44     /// <summary>
45     /// This class does DB accessing work in the context of one connection
46     /// </summary>
47     internal sealed class PersistenceDBAccessor : IDisposable
48     {
49 
50         DbResourceAllocator dbResourceAllocator;
51         DbTransaction localTransaction;
52         DbConnection connection;
53         bool needToCloseConnection;
54         DbRetry dbRetry = null;
55 
56         private class RetryReadException : Exception
57         {
58         }
59 
60 #if DEBUG
61         private static Dictionary<System.Guid, string> _persistenceToDatabaseMap = new Dictionary<Guid, string>();
62 
InsertToDbMap(Guid serviceId, string dbName)63         private static void InsertToDbMap(Guid serviceId, string dbName)
64         {
65             lock (_persistenceToDatabaseMap)
66             {
67                 if (!_persistenceToDatabaseMap.ContainsKey(serviceId))
68                 {
69                     _persistenceToDatabaseMap[serviceId] = dbName;
70                     WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}): writing to database: {1}", serviceId.ToString(), dbName);
71                 }
72             }
73         }
74 #endif
75         #region Constructor/Disposer
76 
77 
78         /// <summary>
79         /// DB access done under a transaction and uses a connection enlisted to this transaction.
80         /// </summary>
81         /// <param name="dbResourceAllocator">Helper to get database connection/command/store procedure parameters/etc</param>
82         /// <param name="transaction">The transaction to do this work under</param>
PersistenceDBAccessor( DbResourceAllocator dbResourceAllocator, System.Transactions.Transaction transaction, WorkflowCommitWorkBatchService transactionService)83         internal PersistenceDBAccessor(
84             DbResourceAllocator dbResourceAllocator,
85             System.Transactions.Transaction transaction,
86             WorkflowCommitWorkBatchService transactionService)
87         {
88             this.dbResourceAllocator = dbResourceAllocator;
89             this.localTransaction = DbResourceAllocator.GetLocalTransaction(
90                 transactionService, transaction);
91             // Get a connection enlisted to this transaction, may or may not need to be freed depending on
92             // if the transaction does connection sharing
93             this.connection = this.dbResourceAllocator.GetEnlistedConnection(
94                 transactionService, transaction, out needToCloseConnection);
95             //
96             // No retries for external transactions
97             this.dbRetry = new DbRetry(false);
98         }
99 
100         /// <summary>
101         /// DB access done without a transaction in a newly opened connection
102         /// </summary>
103         /// <param name="dbResourceAllocator">Helper to get database connection/command/store procedure parameters/etc</param>
PersistenceDBAccessor(DbResourceAllocator dbResourceAllocator, bool enableRetries)104         internal PersistenceDBAccessor(DbResourceAllocator dbResourceAllocator, bool enableRetries)
105         {
106             this.dbResourceAllocator = dbResourceAllocator;
107             this.dbRetry = new DbRetry(enableRetries);
108             DbConnection conn = null;
109             short count = 0;
110             while (true)
111             {
112                 try
113                 {
114                     WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService OpenConnection start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
115                     conn = this.dbResourceAllocator.OpenNewConnection();
116                     WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService. OpenConnection end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
117 
118                     if ((null == conn) || (ConnectionState.Open != conn.State))
119                         throw new InvalidOperationException(ExecutionStringManager.InvalidConnection);
120                     break;
121                 }
122                 catch (Exception e)
123                 {
124                     WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService caught exception from OpenConnection: " + e.ToString());
125 
126                     if (dbRetry.TryDoRetry(ref count))
127                     {
128                         WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService retrying.");
129                         continue;
130                     }
131                     throw;
132                 }
133             }
134             connection = conn;
135             needToCloseConnection = true;
136         }
137 
138 
Dispose()139         public void Dispose()
140         {
141             if (needToCloseConnection)
142             {
143                 Debug.Assert(this.connection != null, "No connection to dispose");
144                 this.connection.Dispose();
145             }
146         }
147 
148         #endregion Constructor/Disposer
149 
150         #region Public Methods Exposed for the Batch to call
151 
DbOwnerId(Guid ownerId)152         private object DbOwnerId(Guid ownerId)
153         {
154             // Empty guid signals no lock, but the database uses null for that, so convert empty to null
155             if (ownerId == Guid.Empty)
156                 return null;
157             return ownerId;
158         }
159 
InsertInstanceState(PendingWorkItem item, Guid ownerId, DateTime ownedUntil)160         public void InsertInstanceState(PendingWorkItem item, Guid ownerId, DateTime ownedUntil)
161         {
162             /* sproc params
163             @uidInstanceID uniqueidentifier,
164             @state image,
165             @status int,
166             @artifacts image,
167             @queueingState image,
168             @unlocked int,
169             @blocked int,
170             @info ntext,
171             @ownerId uniqueidentifier,
172             @ownedUntil datetime
173             @nextTimer datetime
174             */
175             DbCommand command = NewStoredProcCommand("InsertInstanceState");
176             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@uidInstanceID", item.InstanceId));
177             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@state", item.SerializedActivity));
178             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@status", item.Status));
179             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@unlocked", item.Unlocked));
180             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@blocked", item.Blocked));
181             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@info", item.Info));
182             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownedUntil", ownedUntil == DateTime.MaxValue ? SqlDateTime.MaxValue : ownedUntil));
183             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownerID", DbOwnerId(ownerId)));
184             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@nextTimer", item.NextTimer));
185             DbParameter p1 = this.dbResourceAllocator.NewDbParameter();
186             p1.ParameterName = "@result";
187             p1.DbType = DbType.Int32;
188             p1.Value = 0;
189             p1.Direction = ParameterDirection.InputOutput;
190             command.Parameters.Add(p1);
191             //command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@result", DbType.Int32, ParameterDirection.Output));
192             DbParameter p2 = this.dbResourceAllocator.NewDbParameter();
193             p2.ParameterName = "@currentOwnerID";
194             p2.DbType = DbType.Guid;
195             p2.Value = Guid.Empty;
196             p2.Direction = ParameterDirection.InputOutput;
197             command.Parameters.Add(p2);
198             //command.Parameters.Add(new DbParameter(this.dbResourceAllocator.NewDbParameter("@currentOwnerID", DbType.Guid, ParameterDirection.InputOutput));
199 #if DEBUG
200             InsertToDbMap(ownerId, connection.Database);
201 #endif
202             WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}): inserting instance: {1}, unlocking: {2} database: {3}", ownerId.ToString(), item.InstanceId.ToString(), item.Unlocked.ToString(), connection.Database);
203             //
204             // Cannot retry locally here as we don't own the tx
205             // Rely on external retries at the batch commit or workflow level (for tx scopes)
206             command.ExecuteNonQuery();
207             CheckOwnershipResult(command);
208         }
209 
210 
InsertCompletedScope(Guid instanceId, Guid scopeId, Byte[] state)211         public void InsertCompletedScope(Guid instanceId, Guid scopeId, Byte[] state)
212         {
213             /* sproc params
214             @completedScopeID uniqueidentifier,
215             @state image
216             */
217             DbCommand command = NewStoredProcCommand("InsertCompletedScope");
218             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("instanceID", instanceId));
219             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("completedScopeID", scopeId));
220             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("state", state));
221             //
222             // Cannot retry locally here as we don't own the tx
223             // Rely on external retries at the batch commit or workflow level (for tx scopes)
224             command.ExecuteNonQuery();
225         }
226 
ActivationComplete(Guid instanceId, Guid ownerId)227         public void ActivationComplete(Guid instanceId, Guid ownerId)
228         {
229             /* sproc params
230             @instanceID uniqueidentifier,
231             @ownerID uniqueidentifier,
232             */
233             DbCommand command = NewStoredProcCommand("UnlockInstanceState");
234             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@uidInstanceID", instanceId));
235             command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownerID", DbOwnerId(ownerId)));
236 #if DEBUG
237             InsertToDbMap(ownerId, connection.Database);
238 #endif
239             WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}): unlocking instance: {1}, database: {2}", ownerId.ToString(), instanceId.ToString(), connection.Database);
240             command.ExecuteNonQuery();
241         }
242 
RetrieveNonblockingInstanceStateIds(Guid ownerId, DateTime ownedUntil)243         public IList<Guid> RetrieveNonblockingInstanceStateIds(Guid ownerId, DateTime ownedUntil)
244         {
245             List<Guid> gs = null;
246             DbDataReader dr = null;
247             short count = 0;
248             while (true)
249             {
250                 try
251                 {
252                     //
253                     // Check and reset the connection as needed before building the command
254                     if ((null == connection) || (ConnectionState.Open != connection.State))
255                         ResetConnection();
256 
257                     DbCommand command = NewStoredProcCommand("RetrieveNonblockingInstanceStateIds");
258                     command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownedUntil", ownedUntil == DateTime.MaxValue ? SqlDateTime.MaxValue : ownedUntil));
259                     command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownerID", DbOwnerId(ownerId)));
260                     command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@now", DateTime.UtcNow));
261 
262                     WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveNonblockingInstanceStateIds ExecuteReader start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
263                     dr = command.ExecuteReader(CommandBehavior.CloseConnection);
264                     WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveNonblockingInstanceStateIds ExecuteReader end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
265 
266                     gs = new List<Guid>();
267                     while (dr.Read())
268                     {
269                         gs.Add(dr.GetGuid(0));
270                     }
271                     break;
272                 }
273                 catch (Exception e)
274                 {
275                     WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService.RetrieveNonblockingInstanceStateIds caught exception from ExecuteReader: " + e.ToString());
276 
277                     if (dbRetry.TryDoRetry(ref count))
278                     {
279                         WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveNonblockingInstanceStateIds retrying.");
280                         continue;
281                     }
282                     throw;
283                 }
284                 finally
285                 {
286                     if (dr != null)
287                         dr.Close();
288                 }
289             }
290 
291             return gs;
292         }
293 
TryRetrieveANonblockingInstanceStateId(Guid ownerId, DateTime ownedUntil, out Guid instanceId)294         public bool TryRetrieveANonblockingInstanceStateId(Guid ownerId, DateTime ownedUntil, out Guid instanceId)
295         {
296             short count = 0;
297             while (true)
298             {
299                 try
300                 {
301                     //
302                     // Check and reset the connection as needed before building the command
303                     if ((null == connection) || (ConnectionState.Open != connection.State))
304                         ResetConnection();
305 
306                     DbCommand command = NewStoredProcCommand("RetrieveANonblockingInstanceStateId");
307                     command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownedUntil", ownedUntil == DateTime.MaxValue ? SqlDateTime.MaxValue : ownedUntil));
308                     command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownerID", DbOwnerId(ownerId)));
309                     DbParameter p2 = this.dbResourceAllocator.NewDbParameter();
310                     p2.ParameterName = "@uidInstanceID";
311                     p2.DbType = DbType.Guid;
312                     p2.Value = null;
313                     p2.Direction = ParameterDirection.InputOutput;
314 
315                     command.Parameters.Add(p2);
316 
317                     DbParameter found = this.dbResourceAllocator.NewDbParameter();
318                     found.ParameterName = "@found";
319                     found.DbType = DbType.Boolean;
320                     found.Value = null;
321                     found.Direction = ParameterDirection.Output;
322 
323                     command.Parameters.Add(found);
324 
325                     WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.TryRetrieveANonblockingInstanceStateId ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
326                     command.ExecuteNonQuery();
327                     WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.TryRetrieveANonblockingInstanceStateId ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
328 
329                     if ((null != found.Value) && ((bool)found.Value))
330                     {
331                         instanceId = (Guid)p2.Value;
332                         return true;
333                     }
334                     else
335                     {
336                         instanceId = Guid.Empty;
337                         return false;
338                     }
339                 }
340                 catch (Exception e)
341                 {
342                     WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService.TryRetrieveANonblockingInstanceStateId caught exception from ExecuteNonQuery: " + e.ToString());
343 
344                     if (dbRetry.TryDoRetry(ref count))
345                     {
346                         WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.TryRetrieveANonblockingInstanceStateId retrying.");
347                         continue;
348                     }
349                     throw;
350                 }
351             }
352 
353         }
354 
355 
RetrieveExpiredTimerIds(Guid ownerId, DateTime ownedUntil)356         public IList<Guid> RetrieveExpiredTimerIds(Guid ownerId, DateTime ownedUntil)
357         {
358             List<Guid> gs = null;
359             DbDataReader dr = null;
360 
361             short count = 0;
362             while (true)
363             {
364                 try
365                 {
366                     //
367                     // Check and reset the connection as needed before building the command
368                     if ((null == connection) || (ConnectionState.Open != connection.State))
369                         ResetConnection();
370 
371                     DbCommand command = NewStoredProcCommand("RetrieveExpiredTimerIds");
372                     command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownedUntil", ownedUntil == DateTime.MaxValue ? SqlDateTime.MaxValue : ownedUntil));
373                     command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownerID", DbOwnerId(ownerId)));
374                     command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@now", DateTime.UtcNow));
375 
376                     WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveExpiredTimerIds ExecuteReader start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
377                     dr = command.ExecuteReader(CommandBehavior.CloseConnection);
378                     WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveExpiredTimerIds ExecuteReader end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
379 
380                     gs = new List<Guid>();
381                     while (dr.Read())
382                     {
383                         gs.Add(dr.GetGuid(0));
384                     }
385                     break;
386                 }
387                 catch (Exception e)
388                 {
389                     WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService.RetrieveExpiredTimerIds caught exception from ExecuteReader: " + e.ToString());
390 
391                     if (dbRetry.TryDoRetry(ref count))
392                     {
393                         WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveExpiredTimerIds retrying.");
394                         continue;
395                     }
396                     throw;
397                 }
398                 finally
399                 {
400                     if (dr != null)
401                         dr.Close();
402                 }
403             }
404 
405             return gs;
406         }
407 
RetrieveInstanceState(Guid instanceStateId, Guid ownerId, DateTime timeout)408         public Byte[] RetrieveInstanceState(Guid instanceStateId, Guid ownerId, DateTime timeout)
409         {
410             short count = 0;
411             byte[] state = null;
412             while (true)
413             {
414                 try
415                 {
416                     //
417                     // Check and reset the connection as needed before building the command
418                     if ((null == connection) || (ConnectionState.Open != connection.State))
419                         ResetConnection();
420 
421                     DbCommand command = NewStoredProcCommand("RetrieveInstanceState");
422                     command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@uidInstanceID", instanceStateId));
423                     command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownerID", DbOwnerId(ownerId)));
424                     command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownedUntil", timeout == DateTime.MaxValue ? SqlDateTime.MaxValue : timeout));
425                     DbParameter p1 = this.dbResourceAllocator.NewDbParameter();
426                     p1.ParameterName = "@result";
427                     p1.DbType = DbType.Int32;
428                     p1.Value = 0;
429                     p1.Direction = ParameterDirection.InputOutput;
430                     command.Parameters.Add(p1);
431                     //command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@result", DbType.Int32, ParameterDirection.Output));
432                     DbParameter p2 = this.dbResourceAllocator.NewDbParameter();
433                     p2.ParameterName = "@currentOwnerID";
434                     p2.DbType = DbType.Guid;
435                     p2.Value = Guid.Empty;
436                     p2.Direction = ParameterDirection.InputOutput;
437                     command.Parameters.Add(p2);
438                     //command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@currentOwnerID", DbType.Guid, ParameterDirection.InputOutput));
439 #if DEBUG
440                     InsertToDbMap(ownerId, connection.Database);
441 #endif
442                     WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}): retreiving instance: {1}, database: {2}", ownerId.ToString(), instanceStateId.ToString(), connection.Database);
443                     state = RetrieveStateFromDB(command, true, instanceStateId);
444 
445                     break;
446                 }
447                 catch (Exception e)
448                 {
449                     WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService.RetrieveInstanceState caught exception: " + e.ToString());
450 
451                     if (dbRetry.TryDoRetry(ref count))
452                     {
453                         WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveInstanceState retrying.");
454                         continue;
455                     }
456                     else if (e is RetryReadException)    // ### hardcoded retry to work around sql ADM64 read bug ###
457                     {
458                         count++;
459                         if (count < 10)
460                             continue;
461                         else
462                             break;  // give up
463                     }
464                     throw;
465                 }
466             }
467 
468             if (state == null || state.Length == 0)
469             {
470                 Exception e = new InvalidOperationException(string.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.InstanceNotFound, instanceStateId));
471                 e.Data["WorkflowNotFound"] = true;
472                 throw e;
473             }
474 
475             return state;
476         }
477 
RetrieveCompletedScope(Guid scopeId)478         public Byte[] RetrieveCompletedScope(Guid scopeId)
479         {
480             short count = 0;
481             byte[] state = null;
482             while (true)
483             {
484                 try
485                 {
486                     //
487                     // Check and reset the connection as needed before building the command
488                     if ((null == connection) || (ConnectionState.Open != connection.State))
489                         ResetConnection();
490 
491                     DbCommand command = NewStoredProcCommand("RetrieveCompletedScope");
492                     command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@completedScopeID", scopeId));
493                     DbParameter p1 = this.dbResourceAllocator.NewDbParameter();
494                     p1.ParameterName = "@result";
495                     p1.DbType = DbType.Int32;
496                     p1.Value = 0;
497                     p1.Direction = ParameterDirection.InputOutput;
498                     command.Parameters.Add(p1);
499 
500                     state = RetrieveStateFromDB(command, false, WorkflowEnvironment.WorkflowInstanceId);
501 
502                     break;
503                 }
504                 catch (Exception e)
505                 {
506                     WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService.RetrieveCompletedScope caught exception: " + e.ToString());
507 
508                     if (dbRetry.TryDoRetry(ref count))
509                     {
510                         WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveCompletedScope retrying.");
511                         continue;
512                     }
513                     else if (e is RetryReadException)    // ### hardcoded retry to work around sql ADM64 read bug ###
514                     {
515                         count++;
516                         if (count < 10)
517                             continue;
518                         else
519                             break;  // give up
520                     }
521                     throw;
522                 }
523             }
524 
525             if (state == null || state.Length == 0)
526                 throw new InvalidOperationException(
527                     string.Format(Thread.CurrentThread.CurrentCulture,
528                     ExecutionStringManager.CompletedScopeNotFound, scopeId));
529             return state;
530         }
531 
532         #endregion
533 
534         #region private DB accessing methods
535 
536         /// <summary>
537         /// This should only be called for non batch commits
538         /// </summary>
539         /// <returns></returns>
ResetConnection()540         private DbConnection ResetConnection()
541         {
542             if (null != localTransaction)
543                 throw new InvalidOperationException(ExecutionStringManager.InvalidOpConnectionReset);
544 
545             if (!needToCloseConnection)
546                 throw new InvalidOperationException(ExecutionStringManager.InvalidOpConnectionNotLocal);
547 
548             if ((null != connection) && (ConnectionState.Closed != connection.State))
549                 connection.Close();
550             connection.Dispose();
551 
552             connection = this.dbResourceAllocator.OpenNewConnection();
553 
554             return connection;
555         }
556 
557         /// <summary>
558         /// Returns a stored procedure type DBCommand object with current connection and transaction
559         /// </summary>
560         /// <param name="commandText"></param>
561         /// <returns>the command object</returns>
NewStoredProcCommand(string commandText)562         private DbCommand NewStoredProcCommand(string commandText)
563         {
564             DbCommand command = DbResourceAllocator.NewCommand(commandText, this.connection, this.localTransaction);
565             command.CommandType = CommandType.StoredProcedure;
566 
567             return command;
568         }
569 
CheckOwnershipResult(DbCommand command)570         private static void CheckOwnershipResult(DbCommand command)
571         {
572             DbParameter result = command.Parameters["@result"];
573             if (result != null && result.Value != null && (int)result.Value == -2)   // -2 is an ownership conflict
574             {
575                 if (command.Parameters.Contains("@currentOwnerID"))
576                 {
577                     Guid currentOwnerId = Guid.Empty;
578                     if (command.Parameters["@currentOwnerID"].Value is System.Guid)
579                         currentOwnerId = (Guid)command.Parameters["@currentOwnerID"].Value;
580                     Guid myId = (Guid)command.Parameters["@ownerID"].Value;
581                     Guid instId = (Guid)command.Parameters["@uidInstanceID"].Value;
582                     WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}): owership violation with {1} on instance {2}", myId.ToString(), currentOwnerId.ToString(), instId);
583                 }
584                 DbParameter instanceId = command.Parameters["@uidInstanceID"];
585                 throw new WorkflowOwnershipException((Guid)instanceId.Value);
586             }
587         }
588 
589         /// <summary>
590         /// Helper to Public methods RetrieveInstanceState and RetrieveCompletedScope.
591         /// Retrieves an object from the DB by calling the specified stored procedure with specified stored proc params.
592         /// </summary>
593         /// <param name="command">Contains the stored procedure setting to be used to query against the Database</param>
594         /// <returns>an object to be casted to an activity
595         ///     In case of RetrieveInstanceState, only running or suspended instances are returned and
596         ///     exception is thrown for completed/terminated/not-found instances
597         /// </returns>
RetrieveStateFromDB(DbCommand command, bool checkOwnership, Guid instanceId)598         private static Byte[] RetrieveStateFromDB(DbCommand command, bool checkOwnership, Guid instanceId)
599         {
600             DbDataReader dr = null;
601             byte[] result = null;
602 
603             try
604             {
605                 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveStateFromDB {0} ExecuteReader start: {1}", instanceId, DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
606                 dr = command.ExecuteReader();
607                 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveStateFromDB {0} ExecuteReader end: {1}", instanceId, DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
608 
609                 if (dr.Read())
610                 {
611                     result = (byte[])dr.GetValue(0);
612                 }
613                 else
614                 {
615                     DbParameter resultParam = command.Parameters["@result"];
616                     if (resultParam == null || resultParam.Value == null)
617                     {
618                         WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveStateFromDB Failed to read results {0}", instanceId);
619                     }
620                     else if ((int)resultParam.Value > 0)    // found results but failed to read - sql bug - retry the query
621                     {
622                         WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService.RetrieveStateFromDB Failed to read results {1}, @result == {0}", (int)resultParam.Value, instanceId);
623                         throw new RetryReadException();
624                     }
625                 }
626             }
627             finally
628             {
629                 if (dr != null)
630                     dr.Close();
631             }
632 
633             if (checkOwnership)
634                 CheckOwnershipResult(command);
635 
636             return result;
637         }
638 
639         #endregion private DB accessing methods
640 
641 
RetrieveAllInstanceDescriptions()642         internal IEnumerable<SqlPersistenceWorkflowInstanceDescription> RetrieveAllInstanceDescriptions()
643         {
644             List<SqlPersistenceWorkflowInstanceDescription> retval = new List<SqlPersistenceWorkflowInstanceDescription>();
645             DbDataReader dr = null;
646             try
647             {
648                 DbCommand command = NewStoredProcCommand("RetrieveAllInstanceDescriptions");
649                 dr = command.ExecuteReader(CommandBehavior.CloseConnection);
650                 while (dr.Read())
651                 {
652                     retval.Add(new SqlPersistenceWorkflowInstanceDescription(
653                         dr.GetGuid(0),
654                         (WorkflowStatus)dr.GetInt32(1),
655                         dr.GetInt32(2) == 1 ? true : false,
656                         dr.GetString(3),
657                         (SqlDateTime)dr.GetDateTime(4)
658                     ));
659                 }
660             }
661             finally
662             {
663                 if (dr != null)
664                     dr.Close();
665             }
666             return retval;
667         }
668     }
669     #endregion
670 
671     [Obsolete("The System.Workflow.* types are deprecated.  Instead, please use the new types from System.Activities.*")]
672     public class SqlWorkflowPersistenceService : WorkflowPersistenceService, IPendingWork
673     {
674         #region constants
675         // Configure parameters for this services
676         private const string InstanceOwnershipTimeoutSecondsToken = "OwnershipTimeoutSeconds";
677         private const string UnloadOnIdleToken = "UnloadOnIdle";
678         private const string EnableRetriesToken = "EnableRetries";
679 
680         #endregion constants
681 
682         private bool _enableRetries = false;
683         private bool _ignoreCommonEnableRetries = false;
684         private DbResourceAllocator _dbResourceAllocator;
685         private WorkflowCommitWorkBatchService _transactionService;
686         private Guid _serviceInstanceId = Guid.Empty;
687         private TimeSpan _ownershipDelta;
688         private Boolean _unloadOnIdle;
689         const string LoadingIntervalToken = "LoadIntervalSeconds";
690         TimeSpan loadingInterval = new TimeSpan(0, 2, 0);
691         private TimeSpan maxLoadingInterval = new TimeSpan(365, 0, 0, 0, 0);
692         SmartTimer loadingTimer;
693         object timerLock = new object();
694         TimeSpan infinite = new TimeSpan(Timeout.Infinite);
695         private static int _deadlock = 1205;
696 
697         // Saved from constructor input to be used in service start initialization
698         NameValueCollection configParameters;
699         string unvalidatedConnectionString;
700 
701         public Guid ServiceInstanceId
702         {
703             get
704             {
705                 return _serviceInstanceId;
706             }
707         }
708 
709         public TimeSpan LoadingInterval
710         {
711             get { return loadingInterval; }
712         }
713 
714         public bool EnableRetries
715         {
716             get { return _enableRetries; }
717             set
718             {
719                 _enableRetries = value;
720                 _ignoreCommonEnableRetries = true;
721             }
722         }
723 
724         private DateTime OwnershipTimeout
725         {
726             get
727             {
728                 DateTime timeout;
729                 if (_ownershipDelta == TimeSpan.MaxValue)
730                     timeout = DateTime.MaxValue;
731                 else
732                     timeout = DateTime.UtcNow + _ownershipDelta;
733                 return timeout;
734             }
735         }
736 
SqlWorkflowPersistenceService(string connectionString)737         public SqlWorkflowPersistenceService(string connectionString)
738         {
739             if (String.IsNullOrEmpty(connectionString))
740                 throw new ArgumentNullException("connectionString", ExecutionStringManager.MissingConnectionString);
741 
742             this.unvalidatedConnectionString = connectionString;
743         }
744 
SqlWorkflowPersistenceService(NameValueCollection parameters)745         public SqlWorkflowPersistenceService(NameValueCollection parameters)
746         {
747             if (parameters == null)
748                 throw new ArgumentNullException("parameters", ExecutionStringManager.MissingParameters);
749 
750             _ownershipDelta = TimeSpan.MaxValue;   // default is to never timeout
751             if (parameters != null)
752             {
753                 foreach (string key in parameters.Keys)
754                 {
755                     if (key.Equals(DbResourceAllocator.ConnectionStringToken, StringComparison.OrdinalIgnoreCase))
756                     {
757                         // the resource allocator (below) will process the connection string
758                     }
759                     else if (key.Equals(SqlWorkflowPersistenceService.InstanceOwnershipTimeoutSecondsToken, StringComparison.OrdinalIgnoreCase))
760                     {
761                         int seconds = Convert.ToInt32(parameters[SqlWorkflowPersistenceService.InstanceOwnershipTimeoutSecondsToken], System.Globalization.CultureInfo.CurrentCulture);
762                         if (seconds < 0)
763                             throw new ArgumentOutOfRangeException(InstanceOwnershipTimeoutSecondsToken, seconds, ExecutionStringManager.InvalidOwnershipTimeoutValue);
764                         _ownershipDelta = new TimeSpan(0, 0, seconds);
765                         _serviceInstanceId = Guid.NewGuid();
766                         continue;
767                     }
768                     else if (key.Equals(SqlWorkflowPersistenceService.UnloadOnIdleToken, StringComparison.OrdinalIgnoreCase))
769                     {
770                         _unloadOnIdle = bool.Parse(parameters[key]);
771                     }
772                     else if (key.Equals(LoadingIntervalToken, StringComparison.OrdinalIgnoreCase))
773                     {
774                         int interval = int.Parse(parameters[key], CultureInfo.CurrentCulture);
775                         if (interval > 0)
776                             this.loadingInterval = new TimeSpan(0, 0, interval);
777                         else
778                             this.loadingInterval = TimeSpan.Zero;
779                         if (this.loadingInterval > maxLoadingInterval)
780                             throw new ArgumentOutOfRangeException(LoadingIntervalToken, this.LoadingInterval, ExecutionStringManager.LoadingIntervalTooLarge);
781                     }
782                     else if (key.Equals(SqlWorkflowPersistenceService.EnableRetriesToken, StringComparison.OrdinalIgnoreCase))
783                     {
784                         //
785                         // We have a local value for enable retries
786                         _enableRetries = bool.Parse(parameters[key]);
787                         _ignoreCommonEnableRetries = true;
788                     }
789                     else
790                     {
791                         throw new ArgumentException(
792                             String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, key), "parameters");
793                     }
794                 }
795             }
796 
797             this.configParameters = parameters;
798         }
799 
SqlWorkflowPersistenceService(string connectionString, bool unloadOnIdle, TimeSpan instanceOwnershipDuration, TimeSpan loadingInterval)800         public SqlWorkflowPersistenceService(string connectionString, bool unloadOnIdle, TimeSpan instanceOwnershipDuration, TimeSpan loadingInterval)
801         {
802             if (String.IsNullOrEmpty(connectionString))
803                 throw new ArgumentNullException("connectionString", ExecutionStringManager.MissingConnectionString);
804             if (loadingInterval > maxLoadingInterval)
805                 throw new ArgumentOutOfRangeException("loadingInterval", loadingInterval, ExecutionStringManager.LoadingIntervalTooLarge);
806             if (instanceOwnershipDuration < TimeSpan.Zero)
807                 throw new ArgumentOutOfRangeException("instanceOwnershipDuration", instanceOwnershipDuration, ExecutionStringManager.InvalidOwnershipTimeoutValue);
808 
809             this._ownershipDelta = instanceOwnershipDuration;
810             this._unloadOnIdle = unloadOnIdle;
811             this.loadingInterval = loadingInterval;
812             this.unvalidatedConnectionString = connectionString;
813             _serviceInstanceId = Guid.NewGuid();
814         }
815 
816         #region WorkflowRuntimeService
817 
Start()818         override protected internal void Start()
819         {
820             WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({1}): Starting, LoadInternalSeconds={0}", loadingInterval.TotalSeconds, _serviceInstanceId.ToString());
821 
822             _dbResourceAllocator = new DbResourceAllocator(this.Runtime, this.configParameters, this.unvalidatedConnectionString);
823 
824             // Check connection string mismatch if using SharedConnectionWorkflowTransactionService
825             _transactionService = Runtime.GetService<WorkflowCommitWorkBatchService>();
826             _dbResourceAllocator.DetectSharedConnectionConflict(_transactionService);
827 
828             //
829             // If we didn't find a local value for enable retries
830             // check in the common section
831             if ((!_ignoreCommonEnableRetries) && (null != base.Runtime))
832             {
833                 NameValueConfigurationCollection commonConfigurationParameters = base.Runtime.CommonParameters;
834                 if (commonConfigurationParameters != null)
835                 {
836                     // Then scan for connection string in the common configuration parameters section
837                     foreach (string key in commonConfigurationParameters.AllKeys)
838                     {
839                         if (string.Compare(EnableRetriesToken, key, StringComparison.OrdinalIgnoreCase) == 0)
840                         {
841                             _enableRetries = bool.Parse(commonConfigurationParameters[key].Value);
842                             break;
843                         }
844                     }
845                 }
846             }
847 
848             base.Start();
849         }
850 
OnStarted()851         protected override void OnStarted()
852         {
853             if (loadingInterval > TimeSpan.Zero)
854             {
855                 lock (timerLock)
856                 {
857                     base.OnStarted();
858                     loadingTimer = new SmartTimer(new TimerCallback(LoadWorkflowsWithExpiredTimers), null, loadingInterval, loadingInterval);
859                 }
860             }
861             RecoverRunningWorkflowInstances();
862         }
863 
Stop()864         protected internal override void Stop()
865         {
866             WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}): Stopping", _serviceInstanceId.ToString());
867             lock (timerLock)
868             {
869                 base.Stop();
870                 if (loadingTimer != null)
871                 {
872                     loadingTimer.Dispose();
873                     loadingTimer = null;
874                 }
875             }
876         }
877 
878         #endregion WorkflowRuntimeService
879 
RecoverRunningWorkflowInstances()880         private void RecoverRunningWorkflowInstances()
881         {
882             if (Guid.Empty == _serviceInstanceId)
883             {
884                 //
885                 // Only one host, get all the ids in one go
886                 IList<Guid> instanceIds = null;
887                 using (PersistenceDBAccessor persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, _enableRetries))
888                 {
889                     instanceIds = persistenceDBAccessor.RetrieveNonblockingInstanceStateIds(_serviceInstanceId, OwnershipTimeout);
890                 }
891                 foreach (Guid instanceId in instanceIds)
892                 {
893                     try
894                     {
895                         WorkflowInstance instance = Runtime.GetWorkflow(instanceId);
896                         instance.Load();
897                     }
898                     catch (Exception e)
899                     {
900                         RaiseServicesExceptionNotHandledEvent(e, instanceId);
901                     }
902                 }
903             }
904             else
905             {
906                 using (PersistenceDBAccessor persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, _enableRetries))
907                 {
908                     //
909                     // Load one at a time to avoid thrashing with other hosts
910                     Guid instanceId;
911                     while (persistenceDBAccessor.TryRetrieveANonblockingInstanceStateId(_serviceInstanceId, OwnershipTimeout, out instanceId))
912                     {
913                         try
914                         {
915                             WorkflowInstance instance = Runtime.GetWorkflow(instanceId);
916                             instance.Load();
917                         }
918                         catch (Exception e)
919                         {
920                             RaiseServicesExceptionNotHandledEvent(e, instanceId);
921                         }
922                     }
923                 }
924             }
925         }
926 
927 
LoadWorkflowsWithExpiredTimers(object ignored)928         private void LoadWorkflowsWithExpiredTimers(object ignored)
929         {
930             lock (timerLock)
931             {
932                 if (this.State == WorkflowRuntimeServiceState.Started)
933                 {
934                     IList<Guid> ids = null;
935                     try
936                     {
937 
938                         ids = LoadExpiredTimerIds();
939                     }
940                     catch (Exception e)
941                     {
942                         RaiseServicesExceptionNotHandledEvent(e, Guid.Empty);
943                     }
944                     if (ids != null)
945                     {
946                         foreach (Guid id in ids)
947                         {
948                             WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({1}): Loading instance with expired timers {0}", id, _serviceInstanceId.ToString());
949                             try
950                             {
951                                 Runtime.GetWorkflow(id).Load();
952                             }
953                             // Ignore cases where the workflow has been stolen out from under us
954                             catch (WorkflowOwnershipException)
955                             { }
956                             catch (ObjectDisposedException)
957                             {
958                                 throw;
959                             }
960                             catch (InvalidOperationException ioe)
961                             {
962                                 if (!ioe.Data.Contains("WorkflowNotFound"))
963                                     RaiseServicesExceptionNotHandledEvent(ioe, id);
964                             }
965                             catch (Exception e)
966                             {
967                                 RaiseServicesExceptionNotHandledEvent(e, id);
968                             }
969                         }
970                     }
971                 }
972             }
973         }
974 
975 
976         // uidInstanceID, status, blocked, info, nextTimer
977 
SaveWorkflowInstanceState(Activity rootActivity, bool unlock)978         internal protected override void SaveWorkflowInstanceState(Activity rootActivity, bool unlock)
979         {
980             if (rootActivity == null)
981                 throw new ArgumentNullException("rootActivity");
982 
983             WorkflowStatus workflowStatus = WorkflowPersistenceService.GetWorkflowStatus(rootActivity);
984             bool isInstanceBlocked = WorkflowPersistenceService.GetIsBlocked(rootActivity);
985             string instanceInfo = WorkflowPersistenceService.GetSuspendOrTerminateInfo(rootActivity);
986             Guid contextGuid = (Guid)rootActivity.GetValue(Activity.ActivityContextGuidProperty);
987 
988             PendingWorkItem item = new PendingWorkItem();
989             item.Type = PendingWorkItem.ItemType.Instance;
990             item.InstanceId = WorkflowEnvironment.WorkflowInstanceId;
991             if (workflowStatus != WorkflowStatus.Completed && workflowStatus != WorkflowStatus.Terminated)
992                 item.SerializedActivity = WorkflowPersistenceService.GetDefaultSerializedForm(rootActivity);
993             else
994                 item.SerializedActivity = new Byte[0];
995             item.Status = (int)workflowStatus;
996             item.Blocked = isInstanceBlocked ? 1 : 0;
997             item.Info = instanceInfo;
998             item.StateId = contextGuid;
999             item.Unlocked = unlock;
1000             TimerEventSubscriptionCollection timers = (TimerEventSubscriptionCollection)rootActivity.GetValue(TimerEventSubscriptionCollection.TimerCollectionProperty);
1001             Debug.Assert(timers != null, "TimerEventSubscriptionCollection should never be null, but it is");
1002             TimerEventSubscription sub = timers.Peek();
1003             item.NextTimer = sub == null ? SqlDateTime.MaxValue : sub.ExpiresAt;
1004             if (item.Info == null)
1005                 item.Info = "";
1006 
1007             WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({4}):Committing instance {0}, Blocked={1}, Unlocked={2}, NextTimer={3}", contextGuid.ToString(), item.Blocked, item.Unlocked, item.NextTimer.Value.ToLocalTime(), _serviceInstanceId.ToString());
1008 
1009             WorkflowEnvironment.WorkBatch.Add(this, item);
1010         }
1011 
UnlockWorkflowInstanceState(Activity rootActivity)1012         internal protected override void UnlockWorkflowInstanceState(Activity rootActivity)
1013         {
1014             PendingWorkItem item = new PendingWorkItem();
1015             item.Type = PendingWorkItem.ItemType.ActivationComplete;
1016             item.InstanceId = WorkflowEnvironment.WorkflowInstanceId;
1017             WorkflowEnvironment.WorkBatch.Add(this, item);
1018             WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}):Unlocking instance {1}", _serviceInstanceId.ToString(), item.InstanceId.ToString());
1019         }
1020 
LoadWorkflowInstanceState(Guid id)1021         internal protected override Activity LoadWorkflowInstanceState(Guid id)
1022         {
1023             using (PersistenceDBAccessor persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, _enableRetries))
1024             {
1025                 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}):Loading instance {1}", _serviceInstanceId.ToString(), id.ToString());
1026                 byte[] state = persistenceDBAccessor.RetrieveInstanceState(id, _serviceInstanceId, OwnershipTimeout);
1027                 return WorkflowPersistenceService.RestoreFromDefaultSerializedForm(state, null);
1028             }
1029 
1030         }
1031 
LoadExpiredTimerWorkflowIds()1032         public IList<Guid> LoadExpiredTimerWorkflowIds()
1033         {
1034             if (State == WorkflowRuntimeServiceState.Started)
1035             {
1036                 return LoadExpiredTimerIds();
1037             }
1038             else
1039             {
1040                 throw new InvalidOperationException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.WorkflowRuntimeNotStarted));
1041             }
1042         }
1043 
LoadExpiredTimerIds()1044         private IList<Guid> LoadExpiredTimerIds()
1045         {
1046             using (PersistenceDBAccessor persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, _enableRetries))
1047             {
1048                 return persistenceDBAccessor.RetrieveExpiredTimerIds(_serviceInstanceId, OwnershipTimeout);
1049             }
1050         }
1051 
SaveCompletedContextActivity(Activity completedScopeActivity)1052         internal protected override void SaveCompletedContextActivity(Activity completedScopeActivity)
1053         {
1054             PendingWorkItem item = new PendingWorkItem();
1055             item.Type = PendingWorkItem.ItemType.CompletedScope;
1056             item.SerializedActivity = WorkflowPersistenceService.GetDefaultSerializedForm(completedScopeActivity);
1057             item.InstanceId = WorkflowEnvironment.WorkflowInstanceId;
1058             item.StateId = ((ActivityExecutionContextInfo)completedScopeActivity.GetValue(Activity.ActivityExecutionContextInfoProperty)).ContextGuid;
1059 
1060             WorkflowEnvironment.WorkBatch.Add(this, item);
1061         }
1062 
LoadCompletedContextActivity(Guid id, Activity outerActivity)1063         internal protected override Activity LoadCompletedContextActivity(Guid id, Activity outerActivity)
1064         {
1065             using (PersistenceDBAccessor persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, _enableRetries))
1066             {
1067                 byte[] state = persistenceDBAccessor.RetrieveCompletedScope(id);
1068                 return WorkflowPersistenceService.RestoreFromDefaultSerializedForm(state, outerActivity);
1069             }
1070         }
1071 
UnloadOnIdle(Activity activity)1072         internal protected override bool UnloadOnIdle(Activity activity)
1073         {
1074             return _unloadOnIdle;
1075         }
1076 
GetAllWorkflows()1077         public IEnumerable<SqlPersistenceWorkflowInstanceDescription> GetAllWorkflows()
1078         {
1079             if (State == WorkflowRuntimeServiceState.Started)
1080             {
1081                 using (PersistenceDBAccessor persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, _enableRetries))
1082                 {
1083                     return persistenceDBAccessor.RetrieveAllInstanceDescriptions();
1084                 }
1085             }
1086             else
1087             {
1088                 throw new InvalidOperationException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.WorkflowRuntimeNotStarted));
1089             }
1090         }
1091 
1092         #region IPendingWork methods
1093 
IPendingWork.MustCommit(ICollection items)1094         bool IPendingWork.MustCommit(ICollection items)
1095         {
1096             return true;
1097         }
1098 
1099         /// <summary>
1100         /// Commmit the work items using the transaction
1101         /// </summary>
1102         /// <param name="transaction"></param>
1103         /// <param name="items"></param>
IPendingWork.Commit(System.Transactions.Transaction transaction, ICollection items)1104         void IPendingWork.Commit(System.Transactions.Transaction transaction, ICollection items)
1105         {
1106             PersistenceDBAccessor persistenceDBAccessor = null;
1107             try
1108             {
1109                 persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, transaction, _transactionService);
1110                 foreach (PendingWorkItem item in items)
1111                 {
1112                     switch (item.Type)
1113                     {
1114                         case PendingWorkItem.ItemType.Instance:
1115                             persistenceDBAccessor.InsertInstanceState(item, _serviceInstanceId, OwnershipTimeout);
1116                             break;
1117 
1118                         case PendingWorkItem.ItemType.CompletedScope:
1119                             persistenceDBAccessor.InsertCompletedScope(item.InstanceId, item.StateId, item.SerializedActivity);
1120                             break;
1121 
1122                         case PendingWorkItem.ItemType.ActivationComplete:
1123                             persistenceDBAccessor.ActivationComplete(item.InstanceId, _serviceInstanceId);
1124                             break;
1125 
1126                         default:
1127                             Debug.Assert(false, "Committing unknown pending work item type in SqlPersistenceService.Commit()");
1128                             break;
1129                     }
1130 
1131                 }
1132             }
1133             catch (SqlException se)
1134             {
1135                 WorkflowTrace.Runtime.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService({1})Exception thrown while persisting instance: {0}", se.Message, _serviceInstanceId.ToString());
1136                 WorkflowTrace.Runtime.TraceEvent(TraceEventType.Error, 0, "stacktrace : {0}", se.StackTrace);
1137 
1138                 if (se.Number == _deadlock)
1139                 {
1140                     PersistenceException pe = new PersistenceException(se.Message, se);
1141                     throw pe;
1142                 }
1143                 else
1144                 {
1145                     throw;
1146                 }
1147 
1148             }
1149             catch (Exception e)
1150             {
1151                 WorkflowTrace.Runtime.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService({1}): Exception thrown while persisting instance: {0}", e.Message, _serviceInstanceId.ToString());
1152                 WorkflowTrace.Runtime.TraceEvent(TraceEventType.Error, 0, "stacktrace : {0}", e.StackTrace);
1153                 throw e;
1154             }
1155             finally
1156             {
1157                 if (persistenceDBAccessor != null)
1158                     persistenceDBAccessor.Dispose();
1159             }
1160         }
1161 
1162 
1163         /// <summary>
1164         /// Perform necesssary cleanup. Called when the scope
1165         /// has finished processing this batch of work items
1166         /// </summary>
1167         /// <param name="succeeded"></param>
1168         /// <param name="items"></param>
IPendingWork.Complete(bool succeeded, ICollection items)1169         void IPendingWork.Complete(bool succeeded, ICollection items)
1170         {
1171             if (loadingTimer != null && succeeded)
1172             {
1173                 foreach (PendingWorkItem item in items)
1174                 {
1175                     if (item.Type.Equals(PendingWorkItem.ItemType.Instance))
1176                     {
1177                         loadingTimer.Update((DateTime)item.NextTimer);
1178                     }
1179                 }
1180             }
1181         }
1182 
1183         #endregion IPendingWork Methods
1184 
1185     }
1186 
1187     internal class SmartTimer : IDisposable
1188     {
1189         private object locker = new object();
1190         private Timer timer;
1191         private DateTime next;
1192         private bool nextChanged;
1193         private TimeSpan period;
1194         private TimerCallback callback;
1195         private TimeSpan minUpdate = new TimeSpan(0, 0, 5);
1196         private TimeSpan infinite = new TimeSpan(Timeout.Infinite);
1197 
SmartTimer(TimerCallback callback, object state, TimeSpan due, TimeSpan period)1198         public SmartTimer(TimerCallback callback, object state, TimeSpan due, TimeSpan period)
1199         {
1200             this.period = period;
1201             this.callback = callback;
1202             this.next = DateTime.UtcNow + due;
1203             this.timer = new Timer(HandleCallback, state, due, infinite);
1204         }
1205 
Update(DateTime newNext)1206         public void Update(DateTime newNext)
1207         {
1208             if (newNext < next && (next - DateTime.UtcNow) > minUpdate)
1209             {
1210                 lock (locker)
1211                 {
1212                     if (newNext < next && (next - DateTime.UtcNow) > minUpdate && timer != null)
1213                     {
1214                         next = newNext;
1215                         nextChanged = true;
1216                         TimeSpan when = next - DateTime.UtcNow;
1217                         if (when < TimeSpan.Zero)
1218                             when = TimeSpan.Zero;
1219                         timer.Change(when, infinite);
1220                     }
1221                 }
1222             }
1223         }
1224 
HandleCallback(object state)1225         private void HandleCallback(object state)
1226         {
1227             try
1228             {
1229                 callback(state);
1230             }
1231             finally
1232             {
1233                 lock (locker)
1234                 {
1235                     if (timer != null)
1236                     {
1237                         if (!nextChanged)
1238                             next = DateTime.UtcNow + period;
1239                         else
1240                             nextChanged = false;
1241                         TimeSpan when = next - DateTime.UtcNow;
1242                         if (when < TimeSpan.Zero)
1243                             when = TimeSpan.Zero;
1244                         timer.Change(when, infinite);
1245                     }
1246                 }
1247             }
1248         }
1249 
1250 
Dispose()1251         public void Dispose()
1252         {
1253             lock (locker)
1254             {
1255                 if (timer != null)
1256                 {
1257                     timer.Dispose();
1258                     timer = null;
1259                 }
1260             }
1261         }
1262     }
1263 }
1264 
1265