1 using System; 2 using System.IO; 3 using System.Transactions; 4 using System.Diagnostics; 5 using System.Data; 6 using System.Data.Common; 7 using System.Data.SqlTypes; 8 using System.Data.SqlClient; 9 using System.Runtime.Serialization; 10 using System.Runtime.Serialization.Formatters.Binary; 11 using System.Collections; 12 using System.Collections.Generic; 13 using System.Collections.Specialized; 14 using System.Configuration; 15 using System.Text.RegularExpressions; 16 using System.Security.Permissions; 17 using System.Threading; 18 19 using System.Workflow.Runtime.Hosting; 20 using System.Workflow.Runtime; 21 using System.Workflow.ComponentModel; 22 using System.Globalization; 23 24 namespace System.Workflow.Runtime.Hosting 25 { 26 #region PersistenceDBAccessor 27 28 29 internal sealed class PendingWorkItem 30 { 31 public enum ItemType { Instance, CompletedScope, ActivationComplete }; 32 33 public ItemType Type; 34 public Guid InstanceId; 35 public Guid StateId; 36 public Byte[] SerializedActivity; 37 public int Status; 38 public int Blocked; 39 public string Info; 40 public bool Unlocked; 41 public SqlDateTime NextTimer; 42 } 43 44 /// <summary> 45 /// This class does DB accessing work in the context of one connection 46 /// </summary> 47 internal sealed class PersistenceDBAccessor : IDisposable 48 { 49 50 DbResourceAllocator dbResourceAllocator; 51 DbTransaction localTransaction; 52 DbConnection connection; 53 bool needToCloseConnection; 54 DbRetry dbRetry = null; 55 56 private class RetryReadException : Exception 57 { 58 } 59 60 #if DEBUG 61 private static Dictionary<System.Guid, string> _persistenceToDatabaseMap = new Dictionary<Guid, string>(); 62 InsertToDbMap(Guid serviceId, string dbName)63 private static void InsertToDbMap(Guid serviceId, string dbName) 64 { 65 lock (_persistenceToDatabaseMap) 66 { 67 if (!_persistenceToDatabaseMap.ContainsKey(serviceId)) 68 { 69 _persistenceToDatabaseMap[serviceId] = dbName; 70 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}): writing to database: {1}", serviceId.ToString(), dbName); 71 } 72 } 73 } 74 #endif 75 #region Constructor/Disposer 76 77 78 /// <summary> 79 /// DB access done under a transaction and uses a connection enlisted to this transaction. 80 /// </summary> 81 /// <param name="dbResourceAllocator">Helper to get database connection/command/store procedure parameters/etc</param> 82 /// <param name="transaction">The transaction to do this work under</param> PersistenceDBAccessor( DbResourceAllocator dbResourceAllocator, System.Transactions.Transaction transaction, WorkflowCommitWorkBatchService transactionService)83 internal PersistenceDBAccessor( 84 DbResourceAllocator dbResourceAllocator, 85 System.Transactions.Transaction transaction, 86 WorkflowCommitWorkBatchService transactionService) 87 { 88 this.dbResourceAllocator = dbResourceAllocator; 89 this.localTransaction = DbResourceAllocator.GetLocalTransaction( 90 transactionService, transaction); 91 // Get a connection enlisted to this transaction, may or may not need to be freed depending on 92 // if the transaction does connection sharing 93 this.connection = this.dbResourceAllocator.GetEnlistedConnection( 94 transactionService, transaction, out needToCloseConnection); 95 // 96 // No retries for external transactions 97 this.dbRetry = new DbRetry(false); 98 } 99 100 /// <summary> 101 /// DB access done without a transaction in a newly opened connection 102 /// </summary> 103 /// <param name="dbResourceAllocator">Helper to get database connection/command/store procedure parameters/etc</param> PersistenceDBAccessor(DbResourceAllocator dbResourceAllocator, bool enableRetries)104 internal PersistenceDBAccessor(DbResourceAllocator dbResourceAllocator, bool enableRetries) 105 { 106 this.dbResourceAllocator = dbResourceAllocator; 107 this.dbRetry = new DbRetry(enableRetries); 108 DbConnection conn = null; 109 short count = 0; 110 while (true) 111 { 112 try 113 { 114 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService OpenConnection start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); 115 conn = this.dbResourceAllocator.OpenNewConnection(); 116 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService. OpenConnection end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); 117 118 if ((null == conn) || (ConnectionState.Open != conn.State)) 119 throw new InvalidOperationException(ExecutionStringManager.InvalidConnection); 120 break; 121 } 122 catch (Exception e) 123 { 124 WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService caught exception from OpenConnection: " + e.ToString()); 125 126 if (dbRetry.TryDoRetry(ref count)) 127 { 128 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService retrying."); 129 continue; 130 } 131 throw; 132 } 133 } 134 connection = conn; 135 needToCloseConnection = true; 136 } 137 138 Dispose()139 public void Dispose() 140 { 141 if (needToCloseConnection) 142 { 143 Debug.Assert(this.connection != null, "No connection to dispose"); 144 this.connection.Dispose(); 145 } 146 } 147 148 #endregion Constructor/Disposer 149 150 #region Public Methods Exposed for the Batch to call 151 DbOwnerId(Guid ownerId)152 private object DbOwnerId(Guid ownerId) 153 { 154 // Empty guid signals no lock, but the database uses null for that, so convert empty to null 155 if (ownerId == Guid.Empty) 156 return null; 157 return ownerId; 158 } 159 InsertInstanceState(PendingWorkItem item, Guid ownerId, DateTime ownedUntil)160 public void InsertInstanceState(PendingWorkItem item, Guid ownerId, DateTime ownedUntil) 161 { 162 /* sproc params 163 @uidInstanceID uniqueidentifier, 164 @state image, 165 @status int, 166 @artifacts image, 167 @queueingState image, 168 @unlocked int, 169 @blocked int, 170 @info ntext, 171 @ownerId uniqueidentifier, 172 @ownedUntil datetime 173 @nextTimer datetime 174 */ 175 DbCommand command = NewStoredProcCommand("InsertInstanceState"); 176 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@uidInstanceID", item.InstanceId)); 177 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@state", item.SerializedActivity)); 178 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@status", item.Status)); 179 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@unlocked", item.Unlocked)); 180 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@blocked", item.Blocked)); 181 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@info", item.Info)); 182 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownedUntil", ownedUntil == DateTime.MaxValue ? SqlDateTime.MaxValue : ownedUntil)); 183 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownerID", DbOwnerId(ownerId))); 184 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@nextTimer", item.NextTimer)); 185 DbParameter p1 = this.dbResourceAllocator.NewDbParameter(); 186 p1.ParameterName = "@result"; 187 p1.DbType = DbType.Int32; 188 p1.Value = 0; 189 p1.Direction = ParameterDirection.InputOutput; 190 command.Parameters.Add(p1); 191 //command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@result", DbType.Int32, ParameterDirection.Output)); 192 DbParameter p2 = this.dbResourceAllocator.NewDbParameter(); 193 p2.ParameterName = "@currentOwnerID"; 194 p2.DbType = DbType.Guid; 195 p2.Value = Guid.Empty; 196 p2.Direction = ParameterDirection.InputOutput; 197 command.Parameters.Add(p2); 198 //command.Parameters.Add(new DbParameter(this.dbResourceAllocator.NewDbParameter("@currentOwnerID", DbType.Guid, ParameterDirection.InputOutput)); 199 #if DEBUG 200 InsertToDbMap(ownerId, connection.Database); 201 #endif 202 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}): inserting instance: {1}, unlocking: {2} database: {3}", ownerId.ToString(), item.InstanceId.ToString(), item.Unlocked.ToString(), connection.Database); 203 // 204 // Cannot retry locally here as we don't own the tx 205 // Rely on external retries at the batch commit or workflow level (for tx scopes) 206 command.ExecuteNonQuery(); 207 CheckOwnershipResult(command); 208 } 209 210 InsertCompletedScope(Guid instanceId, Guid scopeId, Byte[] state)211 public void InsertCompletedScope(Guid instanceId, Guid scopeId, Byte[] state) 212 { 213 /* sproc params 214 @completedScopeID uniqueidentifier, 215 @state image 216 */ 217 DbCommand command = NewStoredProcCommand("InsertCompletedScope"); 218 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("instanceID", instanceId)); 219 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("completedScopeID", scopeId)); 220 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("state", state)); 221 // 222 // Cannot retry locally here as we don't own the tx 223 // Rely on external retries at the batch commit or workflow level (for tx scopes) 224 command.ExecuteNonQuery(); 225 } 226 ActivationComplete(Guid instanceId, Guid ownerId)227 public void ActivationComplete(Guid instanceId, Guid ownerId) 228 { 229 /* sproc params 230 @instanceID uniqueidentifier, 231 @ownerID uniqueidentifier, 232 */ 233 DbCommand command = NewStoredProcCommand("UnlockInstanceState"); 234 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@uidInstanceID", instanceId)); 235 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownerID", DbOwnerId(ownerId))); 236 #if DEBUG 237 InsertToDbMap(ownerId, connection.Database); 238 #endif 239 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}): unlocking instance: {1}, database: {2}", ownerId.ToString(), instanceId.ToString(), connection.Database); 240 command.ExecuteNonQuery(); 241 } 242 RetrieveNonblockingInstanceStateIds(Guid ownerId, DateTime ownedUntil)243 public IList<Guid> RetrieveNonblockingInstanceStateIds(Guid ownerId, DateTime ownedUntil) 244 { 245 List<Guid> gs = null; 246 DbDataReader dr = null; 247 short count = 0; 248 while (true) 249 { 250 try 251 { 252 // 253 // Check and reset the connection as needed before building the command 254 if ((null == connection) || (ConnectionState.Open != connection.State)) 255 ResetConnection(); 256 257 DbCommand command = NewStoredProcCommand("RetrieveNonblockingInstanceStateIds"); 258 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownedUntil", ownedUntil == DateTime.MaxValue ? SqlDateTime.MaxValue : ownedUntil)); 259 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownerID", DbOwnerId(ownerId))); 260 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@now", DateTime.UtcNow)); 261 262 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveNonblockingInstanceStateIds ExecuteReader start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); 263 dr = command.ExecuteReader(CommandBehavior.CloseConnection); 264 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveNonblockingInstanceStateIds ExecuteReader end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); 265 266 gs = new List<Guid>(); 267 while (dr.Read()) 268 { 269 gs.Add(dr.GetGuid(0)); 270 } 271 break; 272 } 273 catch (Exception e) 274 { 275 WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService.RetrieveNonblockingInstanceStateIds caught exception from ExecuteReader: " + e.ToString()); 276 277 if (dbRetry.TryDoRetry(ref count)) 278 { 279 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveNonblockingInstanceStateIds retrying."); 280 continue; 281 } 282 throw; 283 } 284 finally 285 { 286 if (dr != null) 287 dr.Close(); 288 } 289 } 290 291 return gs; 292 } 293 TryRetrieveANonblockingInstanceStateId(Guid ownerId, DateTime ownedUntil, out Guid instanceId)294 public bool TryRetrieveANonblockingInstanceStateId(Guid ownerId, DateTime ownedUntil, out Guid instanceId) 295 { 296 short count = 0; 297 while (true) 298 { 299 try 300 { 301 // 302 // Check and reset the connection as needed before building the command 303 if ((null == connection) || (ConnectionState.Open != connection.State)) 304 ResetConnection(); 305 306 DbCommand command = NewStoredProcCommand("RetrieveANonblockingInstanceStateId"); 307 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownedUntil", ownedUntil == DateTime.MaxValue ? SqlDateTime.MaxValue : ownedUntil)); 308 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownerID", DbOwnerId(ownerId))); 309 DbParameter p2 = this.dbResourceAllocator.NewDbParameter(); 310 p2.ParameterName = "@uidInstanceID"; 311 p2.DbType = DbType.Guid; 312 p2.Value = null; 313 p2.Direction = ParameterDirection.InputOutput; 314 315 command.Parameters.Add(p2); 316 317 DbParameter found = this.dbResourceAllocator.NewDbParameter(); 318 found.ParameterName = "@found"; 319 found.DbType = DbType.Boolean; 320 found.Value = null; 321 found.Direction = ParameterDirection.Output; 322 323 command.Parameters.Add(found); 324 325 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.TryRetrieveANonblockingInstanceStateId ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); 326 command.ExecuteNonQuery(); 327 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.TryRetrieveANonblockingInstanceStateId ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); 328 329 if ((null != found.Value) && ((bool)found.Value)) 330 { 331 instanceId = (Guid)p2.Value; 332 return true; 333 } 334 else 335 { 336 instanceId = Guid.Empty; 337 return false; 338 } 339 } 340 catch (Exception e) 341 { 342 WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService.TryRetrieveANonblockingInstanceStateId caught exception from ExecuteNonQuery: " + e.ToString()); 343 344 if (dbRetry.TryDoRetry(ref count)) 345 { 346 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.TryRetrieveANonblockingInstanceStateId retrying."); 347 continue; 348 } 349 throw; 350 } 351 } 352 353 } 354 355 RetrieveExpiredTimerIds(Guid ownerId, DateTime ownedUntil)356 public IList<Guid> RetrieveExpiredTimerIds(Guid ownerId, DateTime ownedUntil) 357 { 358 List<Guid> gs = null; 359 DbDataReader dr = null; 360 361 short count = 0; 362 while (true) 363 { 364 try 365 { 366 // 367 // Check and reset the connection as needed before building the command 368 if ((null == connection) || (ConnectionState.Open != connection.State)) 369 ResetConnection(); 370 371 DbCommand command = NewStoredProcCommand("RetrieveExpiredTimerIds"); 372 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownedUntil", ownedUntil == DateTime.MaxValue ? SqlDateTime.MaxValue : ownedUntil)); 373 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownerID", DbOwnerId(ownerId))); 374 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@now", DateTime.UtcNow)); 375 376 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveExpiredTimerIds ExecuteReader start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); 377 dr = command.ExecuteReader(CommandBehavior.CloseConnection); 378 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveExpiredTimerIds ExecuteReader end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); 379 380 gs = new List<Guid>(); 381 while (dr.Read()) 382 { 383 gs.Add(dr.GetGuid(0)); 384 } 385 break; 386 } 387 catch (Exception e) 388 { 389 WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService.RetrieveExpiredTimerIds caught exception from ExecuteReader: " + e.ToString()); 390 391 if (dbRetry.TryDoRetry(ref count)) 392 { 393 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveExpiredTimerIds retrying."); 394 continue; 395 } 396 throw; 397 } 398 finally 399 { 400 if (dr != null) 401 dr.Close(); 402 } 403 } 404 405 return gs; 406 } 407 RetrieveInstanceState(Guid instanceStateId, Guid ownerId, DateTime timeout)408 public Byte[] RetrieveInstanceState(Guid instanceStateId, Guid ownerId, DateTime timeout) 409 { 410 short count = 0; 411 byte[] state = null; 412 while (true) 413 { 414 try 415 { 416 // 417 // Check and reset the connection as needed before building the command 418 if ((null == connection) || (ConnectionState.Open != connection.State)) 419 ResetConnection(); 420 421 DbCommand command = NewStoredProcCommand("RetrieveInstanceState"); 422 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@uidInstanceID", instanceStateId)); 423 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownerID", DbOwnerId(ownerId))); 424 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@ownedUntil", timeout == DateTime.MaxValue ? SqlDateTime.MaxValue : timeout)); 425 DbParameter p1 = this.dbResourceAllocator.NewDbParameter(); 426 p1.ParameterName = "@result"; 427 p1.DbType = DbType.Int32; 428 p1.Value = 0; 429 p1.Direction = ParameterDirection.InputOutput; 430 command.Parameters.Add(p1); 431 //command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@result", DbType.Int32, ParameterDirection.Output)); 432 DbParameter p2 = this.dbResourceAllocator.NewDbParameter(); 433 p2.ParameterName = "@currentOwnerID"; 434 p2.DbType = DbType.Guid; 435 p2.Value = Guid.Empty; 436 p2.Direction = ParameterDirection.InputOutput; 437 command.Parameters.Add(p2); 438 //command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@currentOwnerID", DbType.Guid, ParameterDirection.InputOutput)); 439 #if DEBUG 440 InsertToDbMap(ownerId, connection.Database); 441 #endif 442 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}): retreiving instance: {1}, database: {2}", ownerId.ToString(), instanceStateId.ToString(), connection.Database); 443 state = RetrieveStateFromDB(command, true, instanceStateId); 444 445 break; 446 } 447 catch (Exception e) 448 { 449 WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService.RetrieveInstanceState caught exception: " + e.ToString()); 450 451 if (dbRetry.TryDoRetry(ref count)) 452 { 453 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveInstanceState retrying."); 454 continue; 455 } 456 else if (e is RetryReadException) // ### hardcoded retry to work around sql ADM64 read bug ### 457 { 458 count++; 459 if (count < 10) 460 continue; 461 else 462 break; // give up 463 } 464 throw; 465 } 466 } 467 468 if (state == null || state.Length == 0) 469 { 470 Exception e = new InvalidOperationException(string.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.InstanceNotFound, instanceStateId)); 471 e.Data["WorkflowNotFound"] = true; 472 throw e; 473 } 474 475 return state; 476 } 477 RetrieveCompletedScope(Guid scopeId)478 public Byte[] RetrieveCompletedScope(Guid scopeId) 479 { 480 short count = 0; 481 byte[] state = null; 482 while (true) 483 { 484 try 485 { 486 // 487 // Check and reset the connection as needed before building the command 488 if ((null == connection) || (ConnectionState.Open != connection.State)) 489 ResetConnection(); 490 491 DbCommand command = NewStoredProcCommand("RetrieveCompletedScope"); 492 command.Parameters.Add(this.dbResourceAllocator.NewDbParameter("@completedScopeID", scopeId)); 493 DbParameter p1 = this.dbResourceAllocator.NewDbParameter(); 494 p1.ParameterName = "@result"; 495 p1.DbType = DbType.Int32; 496 p1.Value = 0; 497 p1.Direction = ParameterDirection.InputOutput; 498 command.Parameters.Add(p1); 499 500 state = RetrieveStateFromDB(command, false, WorkflowEnvironment.WorkflowInstanceId); 501 502 break; 503 } 504 catch (Exception e) 505 { 506 WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService.RetrieveCompletedScope caught exception: " + e.ToString()); 507 508 if (dbRetry.TryDoRetry(ref count)) 509 { 510 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveCompletedScope retrying."); 511 continue; 512 } 513 else if (e is RetryReadException) // ### hardcoded retry to work around sql ADM64 read bug ### 514 { 515 count++; 516 if (count < 10) 517 continue; 518 else 519 break; // give up 520 } 521 throw; 522 } 523 } 524 525 if (state == null || state.Length == 0) 526 throw new InvalidOperationException( 527 string.Format(Thread.CurrentThread.CurrentCulture, 528 ExecutionStringManager.CompletedScopeNotFound, scopeId)); 529 return state; 530 } 531 532 #endregion 533 534 #region private DB accessing methods 535 536 /// <summary> 537 /// This should only be called for non batch commits 538 /// </summary> 539 /// <returns></returns> ResetConnection()540 private DbConnection ResetConnection() 541 { 542 if (null != localTransaction) 543 throw new InvalidOperationException(ExecutionStringManager.InvalidOpConnectionReset); 544 545 if (!needToCloseConnection) 546 throw new InvalidOperationException(ExecutionStringManager.InvalidOpConnectionNotLocal); 547 548 if ((null != connection) && (ConnectionState.Closed != connection.State)) 549 connection.Close(); 550 connection.Dispose(); 551 552 connection = this.dbResourceAllocator.OpenNewConnection(); 553 554 return connection; 555 } 556 557 /// <summary> 558 /// Returns a stored procedure type DBCommand object with current connection and transaction 559 /// </summary> 560 /// <param name="commandText"></param> 561 /// <returns>the command object</returns> NewStoredProcCommand(string commandText)562 private DbCommand NewStoredProcCommand(string commandText) 563 { 564 DbCommand command = DbResourceAllocator.NewCommand(commandText, this.connection, this.localTransaction); 565 command.CommandType = CommandType.StoredProcedure; 566 567 return command; 568 } 569 CheckOwnershipResult(DbCommand command)570 private static void CheckOwnershipResult(DbCommand command) 571 { 572 DbParameter result = command.Parameters["@result"]; 573 if (result != null && result.Value != null && (int)result.Value == -2) // -2 is an ownership conflict 574 { 575 if (command.Parameters.Contains("@currentOwnerID")) 576 { 577 Guid currentOwnerId = Guid.Empty; 578 if (command.Parameters["@currentOwnerID"].Value is System.Guid) 579 currentOwnerId = (Guid)command.Parameters["@currentOwnerID"].Value; 580 Guid myId = (Guid)command.Parameters["@ownerID"].Value; 581 Guid instId = (Guid)command.Parameters["@uidInstanceID"].Value; 582 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}): owership violation with {1} on instance {2}", myId.ToString(), currentOwnerId.ToString(), instId); 583 } 584 DbParameter instanceId = command.Parameters["@uidInstanceID"]; 585 throw new WorkflowOwnershipException((Guid)instanceId.Value); 586 } 587 } 588 589 /// <summary> 590 /// Helper to Public methods RetrieveInstanceState and RetrieveCompletedScope. 591 /// Retrieves an object from the DB by calling the specified stored procedure with specified stored proc params. 592 /// </summary> 593 /// <param name="command">Contains the stored procedure setting to be used to query against the Database</param> 594 /// <returns>an object to be casted to an activity 595 /// In case of RetrieveInstanceState, only running or suspended instances are returned and 596 /// exception is thrown for completed/terminated/not-found instances 597 /// </returns> RetrieveStateFromDB(DbCommand command, bool checkOwnership, Guid instanceId)598 private static Byte[] RetrieveStateFromDB(DbCommand command, bool checkOwnership, Guid instanceId) 599 { 600 DbDataReader dr = null; 601 byte[] result = null; 602 603 try 604 { 605 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveStateFromDB {0} ExecuteReader start: {1}", instanceId, DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); 606 dr = command.ExecuteReader(); 607 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveStateFromDB {0} ExecuteReader end: {1}", instanceId, DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); 608 609 if (dr.Read()) 610 { 611 result = (byte[])dr.GetValue(0); 612 } 613 else 614 { 615 DbParameter resultParam = command.Parameters["@result"]; 616 if (resultParam == null || resultParam.Value == null) 617 { 618 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService.RetrieveStateFromDB Failed to read results {0}", instanceId); 619 } 620 else if ((int)resultParam.Value > 0) // found results but failed to read - sql bug - retry the query 621 { 622 WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService.RetrieveStateFromDB Failed to read results {1}, @result == {0}", (int)resultParam.Value, instanceId); 623 throw new RetryReadException(); 624 } 625 } 626 } 627 finally 628 { 629 if (dr != null) 630 dr.Close(); 631 } 632 633 if (checkOwnership) 634 CheckOwnershipResult(command); 635 636 return result; 637 } 638 639 #endregion private DB accessing methods 640 641 RetrieveAllInstanceDescriptions()642 internal IEnumerable<SqlPersistenceWorkflowInstanceDescription> RetrieveAllInstanceDescriptions() 643 { 644 List<SqlPersistenceWorkflowInstanceDescription> retval = new List<SqlPersistenceWorkflowInstanceDescription>(); 645 DbDataReader dr = null; 646 try 647 { 648 DbCommand command = NewStoredProcCommand("RetrieveAllInstanceDescriptions"); 649 dr = command.ExecuteReader(CommandBehavior.CloseConnection); 650 while (dr.Read()) 651 { 652 retval.Add(new SqlPersistenceWorkflowInstanceDescription( 653 dr.GetGuid(0), 654 (WorkflowStatus)dr.GetInt32(1), 655 dr.GetInt32(2) == 1 ? true : false, 656 dr.GetString(3), 657 (SqlDateTime)dr.GetDateTime(4) 658 )); 659 } 660 } 661 finally 662 { 663 if (dr != null) 664 dr.Close(); 665 } 666 return retval; 667 } 668 } 669 #endregion 670 671 [Obsolete("The System.Workflow.* types are deprecated. Instead, please use the new types from System.Activities.*")] 672 public class SqlWorkflowPersistenceService : WorkflowPersistenceService, IPendingWork 673 { 674 #region constants 675 // Configure parameters for this services 676 private const string InstanceOwnershipTimeoutSecondsToken = "OwnershipTimeoutSeconds"; 677 private const string UnloadOnIdleToken = "UnloadOnIdle"; 678 private const string EnableRetriesToken = "EnableRetries"; 679 680 #endregion constants 681 682 private bool _enableRetries = false; 683 private bool _ignoreCommonEnableRetries = false; 684 private DbResourceAllocator _dbResourceAllocator; 685 private WorkflowCommitWorkBatchService _transactionService; 686 private Guid _serviceInstanceId = Guid.Empty; 687 private TimeSpan _ownershipDelta; 688 private Boolean _unloadOnIdle; 689 const string LoadingIntervalToken = "LoadIntervalSeconds"; 690 TimeSpan loadingInterval = new TimeSpan(0, 2, 0); 691 private TimeSpan maxLoadingInterval = new TimeSpan(365, 0, 0, 0, 0); 692 SmartTimer loadingTimer; 693 object timerLock = new object(); 694 TimeSpan infinite = new TimeSpan(Timeout.Infinite); 695 private static int _deadlock = 1205; 696 697 // Saved from constructor input to be used in service start initialization 698 NameValueCollection configParameters; 699 string unvalidatedConnectionString; 700 701 public Guid ServiceInstanceId 702 { 703 get 704 { 705 return _serviceInstanceId; 706 } 707 } 708 709 public TimeSpan LoadingInterval 710 { 711 get { return loadingInterval; } 712 } 713 714 public bool EnableRetries 715 { 716 get { return _enableRetries; } 717 set 718 { 719 _enableRetries = value; 720 _ignoreCommonEnableRetries = true; 721 } 722 } 723 724 private DateTime OwnershipTimeout 725 { 726 get 727 { 728 DateTime timeout; 729 if (_ownershipDelta == TimeSpan.MaxValue) 730 timeout = DateTime.MaxValue; 731 else 732 timeout = DateTime.UtcNow + _ownershipDelta; 733 return timeout; 734 } 735 } 736 SqlWorkflowPersistenceService(string connectionString)737 public SqlWorkflowPersistenceService(string connectionString) 738 { 739 if (String.IsNullOrEmpty(connectionString)) 740 throw new ArgumentNullException("connectionString", ExecutionStringManager.MissingConnectionString); 741 742 this.unvalidatedConnectionString = connectionString; 743 } 744 SqlWorkflowPersistenceService(NameValueCollection parameters)745 public SqlWorkflowPersistenceService(NameValueCollection parameters) 746 { 747 if (parameters == null) 748 throw new ArgumentNullException("parameters", ExecutionStringManager.MissingParameters); 749 750 _ownershipDelta = TimeSpan.MaxValue; // default is to never timeout 751 if (parameters != null) 752 { 753 foreach (string key in parameters.Keys) 754 { 755 if (key.Equals(DbResourceAllocator.ConnectionStringToken, StringComparison.OrdinalIgnoreCase)) 756 { 757 // the resource allocator (below) will process the connection string 758 } 759 else if (key.Equals(SqlWorkflowPersistenceService.InstanceOwnershipTimeoutSecondsToken, StringComparison.OrdinalIgnoreCase)) 760 { 761 int seconds = Convert.ToInt32(parameters[SqlWorkflowPersistenceService.InstanceOwnershipTimeoutSecondsToken], System.Globalization.CultureInfo.CurrentCulture); 762 if (seconds < 0) 763 throw new ArgumentOutOfRangeException(InstanceOwnershipTimeoutSecondsToken, seconds, ExecutionStringManager.InvalidOwnershipTimeoutValue); 764 _ownershipDelta = new TimeSpan(0, 0, seconds); 765 _serviceInstanceId = Guid.NewGuid(); 766 continue; 767 } 768 else if (key.Equals(SqlWorkflowPersistenceService.UnloadOnIdleToken, StringComparison.OrdinalIgnoreCase)) 769 { 770 _unloadOnIdle = bool.Parse(parameters[key]); 771 } 772 else if (key.Equals(LoadingIntervalToken, StringComparison.OrdinalIgnoreCase)) 773 { 774 int interval = int.Parse(parameters[key], CultureInfo.CurrentCulture); 775 if (interval > 0) 776 this.loadingInterval = new TimeSpan(0, 0, interval); 777 else 778 this.loadingInterval = TimeSpan.Zero; 779 if (this.loadingInterval > maxLoadingInterval) 780 throw new ArgumentOutOfRangeException(LoadingIntervalToken, this.LoadingInterval, ExecutionStringManager.LoadingIntervalTooLarge); 781 } 782 else if (key.Equals(SqlWorkflowPersistenceService.EnableRetriesToken, StringComparison.OrdinalIgnoreCase)) 783 { 784 // 785 // We have a local value for enable retries 786 _enableRetries = bool.Parse(parameters[key]); 787 _ignoreCommonEnableRetries = true; 788 } 789 else 790 { 791 throw new ArgumentException( 792 String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, key), "parameters"); 793 } 794 } 795 } 796 797 this.configParameters = parameters; 798 } 799 SqlWorkflowPersistenceService(string connectionString, bool unloadOnIdle, TimeSpan instanceOwnershipDuration, TimeSpan loadingInterval)800 public SqlWorkflowPersistenceService(string connectionString, bool unloadOnIdle, TimeSpan instanceOwnershipDuration, TimeSpan loadingInterval) 801 { 802 if (String.IsNullOrEmpty(connectionString)) 803 throw new ArgumentNullException("connectionString", ExecutionStringManager.MissingConnectionString); 804 if (loadingInterval > maxLoadingInterval) 805 throw new ArgumentOutOfRangeException("loadingInterval", loadingInterval, ExecutionStringManager.LoadingIntervalTooLarge); 806 if (instanceOwnershipDuration < TimeSpan.Zero) 807 throw new ArgumentOutOfRangeException("instanceOwnershipDuration", instanceOwnershipDuration, ExecutionStringManager.InvalidOwnershipTimeoutValue); 808 809 this._ownershipDelta = instanceOwnershipDuration; 810 this._unloadOnIdle = unloadOnIdle; 811 this.loadingInterval = loadingInterval; 812 this.unvalidatedConnectionString = connectionString; 813 _serviceInstanceId = Guid.NewGuid(); 814 } 815 816 #region WorkflowRuntimeService 817 Start()818 override protected internal void Start() 819 { 820 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({1}): Starting, LoadInternalSeconds={0}", loadingInterval.TotalSeconds, _serviceInstanceId.ToString()); 821 822 _dbResourceAllocator = new DbResourceAllocator(this.Runtime, this.configParameters, this.unvalidatedConnectionString); 823 824 // Check connection string mismatch if using SharedConnectionWorkflowTransactionService 825 _transactionService = Runtime.GetService<WorkflowCommitWorkBatchService>(); 826 _dbResourceAllocator.DetectSharedConnectionConflict(_transactionService); 827 828 // 829 // If we didn't find a local value for enable retries 830 // check in the common section 831 if ((!_ignoreCommonEnableRetries) && (null != base.Runtime)) 832 { 833 NameValueConfigurationCollection commonConfigurationParameters = base.Runtime.CommonParameters; 834 if (commonConfigurationParameters != null) 835 { 836 // Then scan for connection string in the common configuration parameters section 837 foreach (string key in commonConfigurationParameters.AllKeys) 838 { 839 if (string.Compare(EnableRetriesToken, key, StringComparison.OrdinalIgnoreCase) == 0) 840 { 841 _enableRetries = bool.Parse(commonConfigurationParameters[key].Value); 842 break; 843 } 844 } 845 } 846 } 847 848 base.Start(); 849 } 850 OnStarted()851 protected override void OnStarted() 852 { 853 if (loadingInterval > TimeSpan.Zero) 854 { 855 lock (timerLock) 856 { 857 base.OnStarted(); 858 loadingTimer = new SmartTimer(new TimerCallback(LoadWorkflowsWithExpiredTimers), null, loadingInterval, loadingInterval); 859 } 860 } 861 RecoverRunningWorkflowInstances(); 862 } 863 Stop()864 protected internal override void Stop() 865 { 866 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}): Stopping", _serviceInstanceId.ToString()); 867 lock (timerLock) 868 { 869 base.Stop(); 870 if (loadingTimer != null) 871 { 872 loadingTimer.Dispose(); 873 loadingTimer = null; 874 } 875 } 876 } 877 878 #endregion WorkflowRuntimeService 879 RecoverRunningWorkflowInstances()880 private void RecoverRunningWorkflowInstances() 881 { 882 if (Guid.Empty == _serviceInstanceId) 883 { 884 // 885 // Only one host, get all the ids in one go 886 IList<Guid> instanceIds = null; 887 using (PersistenceDBAccessor persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, _enableRetries)) 888 { 889 instanceIds = persistenceDBAccessor.RetrieveNonblockingInstanceStateIds(_serviceInstanceId, OwnershipTimeout); 890 } 891 foreach (Guid instanceId in instanceIds) 892 { 893 try 894 { 895 WorkflowInstance instance = Runtime.GetWorkflow(instanceId); 896 instance.Load(); 897 } 898 catch (Exception e) 899 { 900 RaiseServicesExceptionNotHandledEvent(e, instanceId); 901 } 902 } 903 } 904 else 905 { 906 using (PersistenceDBAccessor persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, _enableRetries)) 907 { 908 // 909 // Load one at a time to avoid thrashing with other hosts 910 Guid instanceId; 911 while (persistenceDBAccessor.TryRetrieveANonblockingInstanceStateId(_serviceInstanceId, OwnershipTimeout, out instanceId)) 912 { 913 try 914 { 915 WorkflowInstance instance = Runtime.GetWorkflow(instanceId); 916 instance.Load(); 917 } 918 catch (Exception e) 919 { 920 RaiseServicesExceptionNotHandledEvent(e, instanceId); 921 } 922 } 923 } 924 } 925 } 926 927 LoadWorkflowsWithExpiredTimers(object ignored)928 private void LoadWorkflowsWithExpiredTimers(object ignored) 929 { 930 lock (timerLock) 931 { 932 if (this.State == WorkflowRuntimeServiceState.Started) 933 { 934 IList<Guid> ids = null; 935 try 936 { 937 938 ids = LoadExpiredTimerIds(); 939 } 940 catch (Exception e) 941 { 942 RaiseServicesExceptionNotHandledEvent(e, Guid.Empty); 943 } 944 if (ids != null) 945 { 946 foreach (Guid id in ids) 947 { 948 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({1}): Loading instance with expired timers {0}", id, _serviceInstanceId.ToString()); 949 try 950 { 951 Runtime.GetWorkflow(id).Load(); 952 } 953 // Ignore cases where the workflow has been stolen out from under us 954 catch (WorkflowOwnershipException) 955 { } 956 catch (ObjectDisposedException) 957 { 958 throw; 959 } 960 catch (InvalidOperationException ioe) 961 { 962 if (!ioe.Data.Contains("WorkflowNotFound")) 963 RaiseServicesExceptionNotHandledEvent(ioe, id); 964 } 965 catch (Exception e) 966 { 967 RaiseServicesExceptionNotHandledEvent(e, id); 968 } 969 } 970 } 971 } 972 } 973 } 974 975 976 // uidInstanceID, status, blocked, info, nextTimer 977 SaveWorkflowInstanceState(Activity rootActivity, bool unlock)978 internal protected override void SaveWorkflowInstanceState(Activity rootActivity, bool unlock) 979 { 980 if (rootActivity == null) 981 throw new ArgumentNullException("rootActivity"); 982 983 WorkflowStatus workflowStatus = WorkflowPersistenceService.GetWorkflowStatus(rootActivity); 984 bool isInstanceBlocked = WorkflowPersistenceService.GetIsBlocked(rootActivity); 985 string instanceInfo = WorkflowPersistenceService.GetSuspendOrTerminateInfo(rootActivity); 986 Guid contextGuid = (Guid)rootActivity.GetValue(Activity.ActivityContextGuidProperty); 987 988 PendingWorkItem item = new PendingWorkItem(); 989 item.Type = PendingWorkItem.ItemType.Instance; 990 item.InstanceId = WorkflowEnvironment.WorkflowInstanceId; 991 if (workflowStatus != WorkflowStatus.Completed && workflowStatus != WorkflowStatus.Terminated) 992 item.SerializedActivity = WorkflowPersistenceService.GetDefaultSerializedForm(rootActivity); 993 else 994 item.SerializedActivity = new Byte[0]; 995 item.Status = (int)workflowStatus; 996 item.Blocked = isInstanceBlocked ? 1 : 0; 997 item.Info = instanceInfo; 998 item.StateId = contextGuid; 999 item.Unlocked = unlock; 1000 TimerEventSubscriptionCollection timers = (TimerEventSubscriptionCollection)rootActivity.GetValue(TimerEventSubscriptionCollection.TimerCollectionProperty); 1001 Debug.Assert(timers != null, "TimerEventSubscriptionCollection should never be null, but it is"); 1002 TimerEventSubscription sub = timers.Peek(); 1003 item.NextTimer = sub == null ? SqlDateTime.MaxValue : sub.ExpiresAt; 1004 if (item.Info == null) 1005 item.Info = ""; 1006 1007 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({4}):Committing instance {0}, Blocked={1}, Unlocked={2}, NextTimer={3}", contextGuid.ToString(), item.Blocked, item.Unlocked, item.NextTimer.Value.ToLocalTime(), _serviceInstanceId.ToString()); 1008 1009 WorkflowEnvironment.WorkBatch.Add(this, item); 1010 } 1011 UnlockWorkflowInstanceState(Activity rootActivity)1012 internal protected override void UnlockWorkflowInstanceState(Activity rootActivity) 1013 { 1014 PendingWorkItem item = new PendingWorkItem(); 1015 item.Type = PendingWorkItem.ItemType.ActivationComplete; 1016 item.InstanceId = WorkflowEnvironment.WorkflowInstanceId; 1017 WorkflowEnvironment.WorkBatch.Add(this, item); 1018 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}):Unlocking instance {1}", _serviceInstanceId.ToString(), item.InstanceId.ToString()); 1019 } 1020 LoadWorkflowInstanceState(Guid id)1021 internal protected override Activity LoadWorkflowInstanceState(Guid id) 1022 { 1023 using (PersistenceDBAccessor persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, _enableRetries)) 1024 { 1025 WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlWorkflowPersistenceService({0}):Loading instance {1}", _serviceInstanceId.ToString(), id.ToString()); 1026 byte[] state = persistenceDBAccessor.RetrieveInstanceState(id, _serviceInstanceId, OwnershipTimeout); 1027 return WorkflowPersistenceService.RestoreFromDefaultSerializedForm(state, null); 1028 } 1029 1030 } 1031 LoadExpiredTimerWorkflowIds()1032 public IList<Guid> LoadExpiredTimerWorkflowIds() 1033 { 1034 if (State == WorkflowRuntimeServiceState.Started) 1035 { 1036 return LoadExpiredTimerIds(); 1037 } 1038 else 1039 { 1040 throw new InvalidOperationException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.WorkflowRuntimeNotStarted)); 1041 } 1042 } 1043 LoadExpiredTimerIds()1044 private IList<Guid> LoadExpiredTimerIds() 1045 { 1046 using (PersistenceDBAccessor persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, _enableRetries)) 1047 { 1048 return persistenceDBAccessor.RetrieveExpiredTimerIds(_serviceInstanceId, OwnershipTimeout); 1049 } 1050 } 1051 SaveCompletedContextActivity(Activity completedScopeActivity)1052 internal protected override void SaveCompletedContextActivity(Activity completedScopeActivity) 1053 { 1054 PendingWorkItem item = new PendingWorkItem(); 1055 item.Type = PendingWorkItem.ItemType.CompletedScope; 1056 item.SerializedActivity = WorkflowPersistenceService.GetDefaultSerializedForm(completedScopeActivity); 1057 item.InstanceId = WorkflowEnvironment.WorkflowInstanceId; 1058 item.StateId = ((ActivityExecutionContextInfo)completedScopeActivity.GetValue(Activity.ActivityExecutionContextInfoProperty)).ContextGuid; 1059 1060 WorkflowEnvironment.WorkBatch.Add(this, item); 1061 } 1062 LoadCompletedContextActivity(Guid id, Activity outerActivity)1063 internal protected override Activity LoadCompletedContextActivity(Guid id, Activity outerActivity) 1064 { 1065 using (PersistenceDBAccessor persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, _enableRetries)) 1066 { 1067 byte[] state = persistenceDBAccessor.RetrieveCompletedScope(id); 1068 return WorkflowPersistenceService.RestoreFromDefaultSerializedForm(state, outerActivity); 1069 } 1070 } 1071 UnloadOnIdle(Activity activity)1072 internal protected override bool UnloadOnIdle(Activity activity) 1073 { 1074 return _unloadOnIdle; 1075 } 1076 GetAllWorkflows()1077 public IEnumerable<SqlPersistenceWorkflowInstanceDescription> GetAllWorkflows() 1078 { 1079 if (State == WorkflowRuntimeServiceState.Started) 1080 { 1081 using (PersistenceDBAccessor persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, _enableRetries)) 1082 { 1083 return persistenceDBAccessor.RetrieveAllInstanceDescriptions(); 1084 } 1085 } 1086 else 1087 { 1088 throw new InvalidOperationException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.WorkflowRuntimeNotStarted)); 1089 } 1090 } 1091 1092 #region IPendingWork methods 1093 IPendingWork.MustCommit(ICollection items)1094 bool IPendingWork.MustCommit(ICollection items) 1095 { 1096 return true; 1097 } 1098 1099 /// <summary> 1100 /// Commmit the work items using the transaction 1101 /// </summary> 1102 /// <param name="transaction"></param> 1103 /// <param name="items"></param> IPendingWork.Commit(System.Transactions.Transaction transaction, ICollection items)1104 void IPendingWork.Commit(System.Transactions.Transaction transaction, ICollection items) 1105 { 1106 PersistenceDBAccessor persistenceDBAccessor = null; 1107 try 1108 { 1109 persistenceDBAccessor = new PersistenceDBAccessor(_dbResourceAllocator, transaction, _transactionService); 1110 foreach (PendingWorkItem item in items) 1111 { 1112 switch (item.Type) 1113 { 1114 case PendingWorkItem.ItemType.Instance: 1115 persistenceDBAccessor.InsertInstanceState(item, _serviceInstanceId, OwnershipTimeout); 1116 break; 1117 1118 case PendingWorkItem.ItemType.CompletedScope: 1119 persistenceDBAccessor.InsertCompletedScope(item.InstanceId, item.StateId, item.SerializedActivity); 1120 break; 1121 1122 case PendingWorkItem.ItemType.ActivationComplete: 1123 persistenceDBAccessor.ActivationComplete(item.InstanceId, _serviceInstanceId); 1124 break; 1125 1126 default: 1127 Debug.Assert(false, "Committing unknown pending work item type in SqlPersistenceService.Commit()"); 1128 break; 1129 } 1130 1131 } 1132 } 1133 catch (SqlException se) 1134 { 1135 WorkflowTrace.Runtime.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService({1})Exception thrown while persisting instance: {0}", se.Message, _serviceInstanceId.ToString()); 1136 WorkflowTrace.Runtime.TraceEvent(TraceEventType.Error, 0, "stacktrace : {0}", se.StackTrace); 1137 1138 if (se.Number == _deadlock) 1139 { 1140 PersistenceException pe = new PersistenceException(se.Message, se); 1141 throw pe; 1142 } 1143 else 1144 { 1145 throw; 1146 } 1147 1148 } 1149 catch (Exception e) 1150 { 1151 WorkflowTrace.Runtime.TraceEvent(TraceEventType.Error, 0, "SqlWorkflowPersistenceService({1}): Exception thrown while persisting instance: {0}", e.Message, _serviceInstanceId.ToString()); 1152 WorkflowTrace.Runtime.TraceEvent(TraceEventType.Error, 0, "stacktrace : {0}", e.StackTrace); 1153 throw e; 1154 } 1155 finally 1156 { 1157 if (persistenceDBAccessor != null) 1158 persistenceDBAccessor.Dispose(); 1159 } 1160 } 1161 1162 1163 /// <summary> 1164 /// Perform necesssary cleanup. Called when the scope 1165 /// has finished processing this batch of work items 1166 /// </summary> 1167 /// <param name="succeeded"></param> 1168 /// <param name="items"></param> IPendingWork.Complete(bool succeeded, ICollection items)1169 void IPendingWork.Complete(bool succeeded, ICollection items) 1170 { 1171 if (loadingTimer != null && succeeded) 1172 { 1173 foreach (PendingWorkItem item in items) 1174 { 1175 if (item.Type.Equals(PendingWorkItem.ItemType.Instance)) 1176 { 1177 loadingTimer.Update((DateTime)item.NextTimer); 1178 } 1179 } 1180 } 1181 } 1182 1183 #endregion IPendingWork Methods 1184 1185 } 1186 1187 internal class SmartTimer : IDisposable 1188 { 1189 private object locker = new object(); 1190 private Timer timer; 1191 private DateTime next; 1192 private bool nextChanged; 1193 private TimeSpan period; 1194 private TimerCallback callback; 1195 private TimeSpan minUpdate = new TimeSpan(0, 0, 5); 1196 private TimeSpan infinite = new TimeSpan(Timeout.Infinite); 1197 SmartTimer(TimerCallback callback, object state, TimeSpan due, TimeSpan period)1198 public SmartTimer(TimerCallback callback, object state, TimeSpan due, TimeSpan period) 1199 { 1200 this.period = period; 1201 this.callback = callback; 1202 this.next = DateTime.UtcNow + due; 1203 this.timer = new Timer(HandleCallback, state, due, infinite); 1204 } 1205 Update(DateTime newNext)1206 public void Update(DateTime newNext) 1207 { 1208 if (newNext < next && (next - DateTime.UtcNow) > minUpdate) 1209 { 1210 lock (locker) 1211 { 1212 if (newNext < next && (next - DateTime.UtcNow) > minUpdate && timer != null) 1213 { 1214 next = newNext; 1215 nextChanged = true; 1216 TimeSpan when = next - DateTime.UtcNow; 1217 if (when < TimeSpan.Zero) 1218 when = TimeSpan.Zero; 1219 timer.Change(when, infinite); 1220 } 1221 } 1222 } 1223 } 1224 HandleCallback(object state)1225 private void HandleCallback(object state) 1226 { 1227 try 1228 { 1229 callback(state); 1230 } 1231 finally 1232 { 1233 lock (locker) 1234 { 1235 if (timer != null) 1236 { 1237 if (!nextChanged) 1238 next = DateTime.UtcNow + period; 1239 else 1240 nextChanged = false; 1241 TimeSpan when = next - DateTime.UtcNow; 1242 if (when < TimeSpan.Zero) 1243 when = TimeSpan.Zero; 1244 timer.Change(when, infinite); 1245 } 1246 } 1247 } 1248 } 1249 1250 Dispose()1251 public void Dispose() 1252 { 1253 lock (locker) 1254 { 1255 if (timer != null) 1256 { 1257 timer.Dispose(); 1258 timer = null; 1259 } 1260 } 1261 } 1262 } 1263 } 1264 1265