1 //---------------------------------------------------------------- 2 // Copyright (c) Microsoft Corporation. All rights reserved. 3 //---------------------------------------------------------------- 4 5 namespace System.ServiceModel.Activities.Dispatcher 6 { 7 using System; 8 using System.Activities; 9 using System.Activities.DynamicUpdate; 10 using System.Activities.Hosting; 11 using System.Activities.Tracking; 12 using System.Collections.Generic; 13 using System.Collections.ObjectModel; 14 using System.Diagnostics; 15 using System.Diagnostics.CodeAnalysis; 16 using System.Globalization; 17 using System.Linq; 18 using System.Runtime; 19 using System.Runtime.Interop; 20 using System.Runtime.DurableInstancing; 21 using System.Security; 22 using System.Security.Permissions; 23 using System.ServiceModel.Activation; 24 using System.ServiceModel.Activities; 25 using System.ServiceModel.Activities.Description; 26 using System.ServiceModel.Activities.Diagnostics; 27 using System.Threading; 28 using System.Transactions; 29 using System.Xml.Linq; 30 31 // WorkflowServiceInstance is free-threaded. It is responsible for the correct locking and usage of the underlying WorkflowInstance. 32 // Given that there are two simultaneous users of WorkflowInstance (WorkflowServiceInstance and Activities), 33 // it is imperative that WorkflowServiceInstance only calls into WorkflowInstance when there are no activities executing 34 // (and thus no worries about colliding with AEC calls). 35 36 // LOCKING SCHEME DESCRIPTION 37 // AcquireLock* - These are the only locks that should call Enter on the WorkflowExecutionLock. 38 // ReleaseLock - This is the only method that should call Exit on the WorkflowExecutionLock. 39 // Lock Handoff - The lock is often handed off from one thread to another. This is handled by 40 // WorkflowExecutionLock itself. If there is a waiter (someone called Enter) then the Exit call 41 // will simply notify the first waiter. The waiter is now responsible for the lock. 42 // NOTE: There is a small period of time where no one things they own the lock. Exit has "handed 43 // off the lock by calling Set on the waiter, but the waiter has not yet executed the code 44 // which sets ownsLock to true. 45 // Sync Handoff - During sync handoff the ref bool ownsLock will be set accordingly by the 46 // Acquire* method. These methods should always be called in a try block with a finally 47 // which calls ReleaseLock. 48 // Async Handoff - During async handoff the callback can assume it has the lock if either 49 // there was no exception (FastAsyncCallback) or the call to End sets the ref bool ownsLock 50 // to true. Note that in cases of async handoff there should always be a guarding ReleaseLock 51 // which releases the lock if the async call does not state that it has gone async. 52 // Scheduler Interactions - The scheduler's state MUST ONLY be changed with the activeOperationsLock 53 // held. This is to guarantee that a Pause (Acquire) is not clobbered by a concurrently executing 54 // Resume (Release) resulting in an instance operation which times out when it shouldn't have. 55 // ActiveOperations RefCount - The activeOperations ref count MUST be incremented before calling 56 // any of the Enter variations and must be decremented after leaving the Enter. ActiveOperations 57 // is how ReleaseLock determines whether to hand the lock off to a waiting operation or to continue 58 // execution workflow when the workflow is in a runnable state. 59 // Future Innovation - If necessary we can consider iterating on the current code to provide 60 // better guarantees around async handoff. For example, at the risk of starvation we could 61 // actually exit the lock before notifying waiters rather than doing a direct handoff. 62 [Fx.Tag.XamlVisible(false)] 63 class WorkflowServiceInstance : WorkflowInstance 64 { 65 static AsyncCallback handleEndReleaseInstance; 66 static FastAsyncCallback lockAcquiredAsyncCallback = new FastAsyncCallback(OnLockAcquiredAsync); 67 static AsyncCallback trackCompleteDoneCallback; 68 static AsyncCallback trackIdleDoneCallback; 69 static AsyncCallback trackUnhandledExceptionDoneCallback; 70 static ReadOnlyCollection<BookmarkInfo> emptyBookmarkInfoCollection = new ReadOnlyCollection<BookmarkInfo>(new List<BookmarkInfo>()); 71 72 WorkflowExecutionLock executorLock; 73 74 PersistenceContext persistenceContext; 75 PersistencePipeline persistencePipelineInUse; 76 bool abortingExtensions; 77 78 int activeOperations; 79 object activeOperationsLock; 80 int handlerThreadId; 81 bool isInHandler; 82 List<AsyncWaitHandle> idleWaiters; 83 List<AsyncWaitHandle> nextIdleWaiters; 84 List<WaitForCanPersistAsyncResult> checkCanPersistWaiters; 85 86 // Used for synchronizing ResumeBookmark calls on the the load path from extensions (e.g DurableTimerExtension) 87 AsyncWaitHandle workflowServiceInstanceReadyWaitHandle; 88 bool isWorkflowServiceInstanceReady; 89 90 // Tracking for one-time actions per instance lifetime (these end up being persisted) 91 bool hasRaisedCompleted; 92 bool hasPersistedDeleted; 93 94 bool isRunnable; 95 BufferedReceiveManager bufferedReceiveManager; 96 State state; 97 object thisLock; 98 TransactionContext transactionContext; 99 bool isInTransaction; 100 bool isTransactedCancelled; 101 Dictionary<string, List<PendingOperationAsyncResult>> pendingOperations; 102 int pendingOperationCount; 103 Guid instanceId; 104 105 // Used for synchronizing unload with persist 106 // This is to mark that the instance has made progress but has not been persisted by idle policy yet 107 bool hasDataToPersist; 108 109 // tracks the outstanding requests. This contributes to idle calculations, and the list is notified 110 // if workflow completes in any way (including unhandled exception) 111 List<WorkflowOperationContext> pendingRequests; 112 113 // Various Policies 114 UnloadInstancePolicyHelper unloadInstancePolicy; 115 UnhandledExceptionPolicyHelper unhandledExceptionPolicy; 116 int referenceCount; 117 ThreadNeutralSemaphore acquireReferenceSemaphore; 118 119 WorkflowServiceHost serviceHost; 120 WorkflowCreationContext creationContext; 121 bool creationContextAborted; 122 IDictionary<string, object> workflowOutputs; 123 Exception terminationException; 124 ActivityInstanceState completionState; 125 TimeSpan persistTimeout; 126 TimeSpan trackTimeout; 127 TimeSpan acquireLockTimeout; 128 129 //Tracking for increment of ASP.NET busy count 130 bool hasIncrementedBusyCount; 131 132 // dummy ctor only used to calculate IsLoadTransactionRequired WorkflowServiceInstance(WorkflowServiceHost serviceHost)133 WorkflowServiceInstance(WorkflowServiceHost serviceHost) 134 : base(serviceHost.Activity) 135 { 136 } 137 WorkflowServiceInstance(Activity workflowDefinition, WorkflowIdentity definitionIdentity, Guid instanceId, WorkflowServiceHost serviceHost, PersistenceContext persistenceContext)138 WorkflowServiceInstance(Activity workflowDefinition, WorkflowIdentity definitionIdentity, Guid instanceId, WorkflowServiceHost serviceHost, PersistenceContext persistenceContext) 139 : base(workflowDefinition, definitionIdentity) 140 { 141 this.serviceHost = serviceHost; 142 this.instanceId = instanceId; 143 this.persistTimeout = serviceHost.PersistTimeout; 144 this.trackTimeout = serviceHost.TrackTimeout; 145 this.bufferedReceiveManager = serviceHost.Extensions.Find<BufferedReceiveManager>(); 146 147 if (persistenceContext != null) 148 { 149 this.persistenceContext = persistenceContext; 150 this.persistenceContext.Closed += this.OnPersistenceContextClosed; 151 } 152 153 this.thisLock = new object(); 154 this.pendingRequests = new List<WorkflowOperationContext>(); 155 this.executorLock = new WorkflowExecutionLock(this); 156 this.activeOperationsLock = new object(); 157 this.acquireReferenceSemaphore = new ThreadNeutralSemaphore(1); 158 this.acquireLockTimeout = TimeSpan.MaxValue; 159 160 // Two initial references are held: 161 // The first referenceCount is owned by UnloadInstancePolicy (ReleaseInstance) 162 this.referenceCount = 1; 163 // The second referenceCount is owned by the loader / creator of the instance. 164 this.TryAddReference(); 165 } 166 167 static AsyncCallback TrackIdleDoneCallback 168 { 169 get 170 { 171 if (trackIdleDoneCallback == null) 172 { 173 trackIdleDoneCallback = Fx.ThunkCallback(new AsyncCallback(OnTrackIdleDone)); 174 } 175 176 return trackIdleDoneCallback; 177 } 178 } 179 180 static AsyncCallback TrackUnhandledExceptionDoneCallback 181 { 182 get 183 { 184 if (trackUnhandledExceptionDoneCallback == null) 185 { 186 trackUnhandledExceptionDoneCallback = Fx.ThunkCallback(new AsyncCallback(OnTrackUnhandledExceptionDone)); 187 } 188 189 return trackUnhandledExceptionDoneCallback; 190 } 191 } 192 193 static AsyncCallback TrackCompleteDoneCallback 194 { 195 get 196 { 197 if (trackCompleteDoneCallback == null) 198 { 199 trackCompleteDoneCallback = Fx.ThunkCallback(new AsyncCallback(OnTrackCompleteDone)); 200 } 201 202 return trackCompleteDoneCallback; 203 } 204 } 205 206 // cache the results for perf from the extension container 207 internal List<IPersistencePipelineModule> PipelineModules 208 { 209 get; 210 private set; 211 } 212 213 public BufferedReceiveManager BufferedReceiveManager 214 { 215 get 216 { 217 return this.bufferedReceiveManager; 218 } 219 } 220 221 public override Guid Id 222 { 223 get 224 { 225 return this.instanceId; 226 } 227 } 228 229 public bool IsActive 230 { 231 get 232 { 233 return this.state == State.Active; 234 } 235 } 236 237 public bool HasBeenUpdated 238 { 239 get; 240 private set; 241 } 242 243 protected override bool SupportsInstanceKeys 244 { 245 get 246 { 247 return true; 248 } 249 } 250 251 bool IsIdle 252 { 253 get 254 { 255 return this.Controller.State == WorkflowInstanceState.Idle; 256 } 257 } 258 259 bool ShouldRaiseComplete 260 { 261 get 262 { 263 return this.Controller.State == WorkflowInstanceState.Complete && !this.hasRaisedCompleted; 264 } 265 } 266 267 bool ShouldRaiseIdle 268 { 269 get 270 { 271 return this.IsIdle && !this.hasRaisedCompleted && this.state != State.Aborted; 272 } 273 } 274 275 bool IsHandlerThread 276 { 277 get 278 { 279 return this.isInHandler && this.handlerThreadId == Thread.CurrentThread.ManagedThreadId; 280 } 281 } 282 283 UnloadInstancePolicyHelper UnloadInstancePolicy 284 { 285 get 286 { 287 if (this.unloadInstancePolicy == null) 288 { 289 this.unloadInstancePolicy = new UnloadInstancePolicyHelper(this, this.serviceHost.IdleTimeToPersist, this.serviceHost.IdleTimeToUnload); 290 } 291 return this.unloadInstancePolicy; 292 } 293 } 294 295 UnhandledExceptionPolicyHelper UnhandledExceptionPolicy 296 { 297 get 298 { 299 if (this.unhandledExceptionPolicy == null) 300 { 301 this.unhandledExceptionPolicy = new UnhandledExceptionPolicyHelper(this, this.serviceHost.UnhandledExceptionAction); 302 } 303 return this.unhandledExceptionPolicy; 304 } 305 } 306 307 // create a dummy instance to configure extensions and determine if a load-time transaction is required IsLoadTransactionRequired(WorkflowServiceHost host)308 public static bool IsLoadTransactionRequired(WorkflowServiceHost host) 309 { 310 WorkflowServiceInstance instance = new WorkflowServiceInstance(host); 311 instance.RegisterExtensionManager(host.WorkflowExtensions); 312 return instance.GetExtensions<IPersistencePipelineModule>().Any(module => module.IsLoadTransactionRequired); 313 } 314 InitializeInstance(PersistenceContext persistenceContext, Guid instanceId, Activity workflowDefinition, WorkflowIdentity definitionIdentity, IDictionary<XName, InstanceValue> loadedObject, WorkflowCreationContext creationContext, SynchronizationContext synchronizationContext, WorkflowServiceHost serviceHost, DynamicUpdateMap updateMap = null)315 public static WorkflowServiceInstance InitializeInstance(PersistenceContext persistenceContext, Guid instanceId, Activity workflowDefinition, WorkflowIdentity definitionIdentity, IDictionary<XName, InstanceValue> loadedObject, WorkflowCreationContext creationContext, 316 SynchronizationContext synchronizationContext, WorkflowServiceHost serviceHost, DynamicUpdateMap updateMap = null) 317 { 318 Fx.Assert(workflowDefinition != null, "workflowDefinition cannot be null."); 319 Fx.Assert(serviceHost != null, "serviceHost cannot be null!"); 320 Fx.Assert(instanceId != Guid.Empty, "instanceId cannot be empty."); 321 322 WorkflowServiceInstance workflowInstance = new WorkflowServiceInstance(workflowDefinition, definitionIdentity, instanceId, serviceHost, persistenceContext) 323 { 324 SynchronizationContext = synchronizationContext 325 }; 326 327 // let us initalize the instance level extensions here 328 workflowInstance.SetupExtensions(serviceHost.WorkflowExtensions); 329 330 if (loadedObject != null) 331 { 332 InstanceValue stateValue; 333 object deserializedRuntimeState; 334 335 if (!loadedObject.TryGetValue(WorkflowNamespace.Workflow, out stateValue) || stateValue.Value == null) 336 { 337 throw FxTrace.Exception.AsError( 338 new InstancePersistenceException(SR.WorkflowInstanceNotFoundInStore(instanceId))); 339 } 340 deserializedRuntimeState = stateValue.Value; 341 342 if (loadedObject.TryGetValue(WorkflowServiceNamespace.CreationContext, out stateValue)) 343 { 344 workflowInstance.creationContext = (WorkflowCreationContext)stateValue.Value; 345 } 346 347 if (persistenceContext.IsSuspended) 348 { 349 workflowInstance.state = State.Suspended; 350 } 351 try 352 { 353 workflowInstance.Initialize(deserializedRuntimeState, updateMap); 354 } 355 catch (InstanceUpdateException) 356 { 357 // Need to flush the tracking record for the update failure 358 workflowInstance.ScheduleAbortTracking(true); 359 throw; 360 } 361 362 if (updateMap != null) 363 { 364 workflowInstance.HasBeenUpdated = true; 365 } 366 } 367 else 368 { 369 IList<Handle> rootExecutionProperties = null; 370 IDictionary<string, object> workflowArguments = null; 371 // Provide default CorrelationScope if root activity is not CorrelationScope 372 if (!(workflowDefinition is CorrelationScope)) 373 { 374 rootExecutionProperties = new List<Handle>(1) 375 { 376 new CorrelationHandle() 377 }; 378 } 379 380 if (creationContext != null) 381 { 382 workflowArguments = creationContext.RawWorkflowArguments; 383 workflowInstance.creationContext = creationContext; 384 } 385 workflowInstance.Initialize(workflowArguments, rootExecutionProperties); 386 } 387 388 return workflowInstance; 389 } 390 SetupExtensions(WorkflowInstanceExtensionManager extensionManager)391 void SetupExtensions(WorkflowInstanceExtensionManager extensionManager) 392 { 393 base.RegisterExtensionManager(extensionManager); 394 395 // cache IPersistencePipelineModules 396 IEnumerable<IPersistencePipelineModule> modules = base.GetExtensions<IPersistencePipelineModule>(); 397 int modulesCount = modules.Count<IPersistencePipelineModule>(); 398 if (modulesCount > 0) 399 { 400 this.PipelineModules = new List<IPersistencePipelineModule>(modulesCount); 401 this.PipelineModules.AddRange(modules); 402 } 403 } 404 OnPersistenceContextClosed(object sender, EventArgs e)405 void OnPersistenceContextClosed(object sender, EventArgs e) 406 { 407 if (this.persistenceContext.Aborted && !this.abortingExtensions) 408 { 409 AbortInstance(new FaultException(OperationExecutionFault.CreateAbortedFault(SR.DefaultAbortReason)), false); 410 } 411 } 412 413 // Call when GetInstance to perform operation TryAddReference()414 bool TryAddReference() 415 { 416 bool success = false; 417 lock (this.thisLock) 418 { 419 if (this.referenceCount > 0) 420 { 421 ++this.referenceCount; 422 success = true; 423 } 424 } 425 if (success) 426 { 427 this.UnloadInstancePolicy.Cancel(); 428 } 429 return success; 430 } 431 432 // Called by unload via unload policy TryReleaseLastReference()433 bool TryReleaseLastReference() 434 { 435 lock (this.thisLock) 436 { 437 if (this.referenceCount == 1) 438 { 439 this.referenceCount = 0; 440 return true; 441 } 442 } 443 return false; 444 } 445 446 // Called when terminating ongoing unload RecoverLastReference()447 void RecoverLastReference() 448 { 449 lock (this.thisLock) 450 { 451 Fx.Assert(this.referenceCount == 0, "referenceCount must be 0 during unload"); 452 this.referenceCount = 1; 453 } 454 } 455 456 // Release after operation done ReleaseReference()457 public int ReleaseReference() 458 { 459 int refCount; 460 lock (this.thisLock) 461 { 462 Fx.AssertAndThrow(this.referenceCount > 1, "referenceCount must be greater than 1"); 463 refCount = --this.referenceCount; 464 } 465 StartUnloadInstancePolicyIfNecessary(); 466 return refCount; 467 } 468 StartUnloadInstancePolicyIfNecessary()469 void StartUnloadInstancePolicyIfNecessary() 470 { 471 // The conditions to start unload policy. 472 // - referenceCount is 1. Like COM, This is the last reference count hold by WorkflowServiceInstance itself. 473 // It is incremented per command (control/resumebookmark) and decremented when command is done. 474 // - No lock pending. In general, when referenceCount is 1, the executor lock is freed and WF is idled. 475 // There is, however, one narrow case for Persist activity. When it goes async (executing Sql command), 476 // the referenceCount is decremented to 1 but WF sheduler still busy. In this case, we will let 477 // the lock release to initiate the policy. 478 // - Not in transaction (TxCommit will take care of this). 479 // - Must not be in completed or unloaded or aborted states. 480 // Note: it is okay to dirty read referenceCount and isLocked. If the UnloadInstancePolicy starts before 481 // increment, the increment will correct and cancel it. If the increment happens before, ReleaseReference 482 // will have a chance to start the policy. Same applies to isLocked. 483 if (this.referenceCount == 1 && !this.executorLock.IsLocked && !this.isInTransaction && 484 this.state != State.Completed && this.state != State.Unloaded && this.state != State.Aborted) 485 { 486 this.UnloadInstancePolicy.Begin(); 487 } 488 } 489 AcquireLock(TimeSpan timeout, ref bool ownsLock)490 void AcquireLock(TimeSpan timeout, ref bool ownsLock) 491 { 492 Fx.Assert(!ownsLock, "We should never call acquire if we already think we own the lock."); 493 494 if (this.IsHandlerThread) 495 { 496 // We're in a handler, on the handler thread, and doing work synchronously so we already have the lock 497 return; 498 } 499 500 if (!this.executorLock.TryEnter(ref ownsLock)) 501 { 502 Fx.Assert(!ownsLock, "This should always match the return of TryEnter and is only useful in light of exceptions"); 503 504 bool incrementedActiveOperations = false; 505 object lockToken = null; 506 507 try 508 { 509 lock (this.activeOperationsLock) 510 { 511 try 512 { 513 } 514 finally 515 { 516 this.activeOperations++; 517 incrementedActiveOperations = true; 518 } 519 520 // An exception occuring before we call PauseScheduler causes no issues/----s since 521 // we'll just cleanup activeOperations and be in the same state as when AcquireLock 522 // was called. 523 524 this.Controller.RequestPause(); 525 526 this.executorLock.SetupWaiter(ref lockToken); 527 } 528 529 // There is a ---- here which is solved by code in ReleaseLock. In short, if we fail 530 // to acquire the lock here but before we decrement activeOperations the workflow pauses 531 // then nothing will ever restart the workflow. To that end, ReleaseLock does some 532 // special handling when it exits the lock and no one is waiting. 533 534 this.executorLock.Enter(timeout, ref lockToken, ref ownsLock); 535 } 536 finally 537 { 538 if (incrementedActiveOperations) 539 { 540 lock (this.activeOperationsLock) 541 { 542 this.activeOperations--; 543 } 544 } 545 546 this.executorLock.CleanupWaiter(lockToken, ref ownsLock); 547 } 548 } 549 } 550 AcquireLockAsync(TimeSpan timeout, ref bool ownsLock, FastAsyncCallback callback, object state)551 bool AcquireLockAsync(TimeSpan timeout, ref bool ownsLock, FastAsyncCallback callback, object state) 552 { 553 return AcquireLockAsync(timeout, false, false, ref ownsLock, callback, state); 554 } 555 AcquireLockAsync(TimeSpan timeout, bool isAbortPriority, bool skipPause, ref bool ownsLock, FastAsyncCallback callback, object state)556 bool AcquireLockAsync(TimeSpan timeout, bool isAbortPriority, bool skipPause, ref bool ownsLock, FastAsyncCallback callback, object state) 557 { 558 Fx.Assert(!ownsLock, "We should never call acquire if we already think we own the lock."); 559 560 // We cannot just hand off the lock if we are in a handler thread 561 // because this might eventually go async (during the operation) 562 // and we could have multiple operations occurring concurrently. 563 564 if (!this.executorLock.TryEnter(ref ownsLock)) 565 { 566 Fx.Assert(!ownsLock, "This should always match the return of TryEnter and is only useful in light of exceptions"); 567 568 bool incrementedActiveOperations = false; 569 bool decrementActiveOperations = true; 570 object lockToken = null; 571 572 try 573 { 574 lock (this.activeOperationsLock) 575 { 576 try 577 { 578 } 579 finally 580 { 581 this.activeOperations++; 582 incrementedActiveOperations = true; 583 } 584 585 // An exception occuring before we call PauseScheduler causes no issues/----s since 586 // we'll just cleanup activeOperations and be in the same state as when AcquireLock 587 // was called. 588 589 if (!skipPause) 590 { 591 this.Controller.RequestPause(); 592 } 593 594 this.executorLock.SetupWaiter(isAbortPriority, ref lockToken); 595 } 596 597 // If we get the lock here then we should decrement, otherwise 598 // it is up to the lock acquired callback 599 decrementActiveOperations = this.executorLock.EnterAsync(timeout, ref lockToken, ref ownsLock, lockAcquiredAsyncCallback, new AcquireLockAsyncData(this, callback, state)); 600 return decrementActiveOperations; 601 } 602 finally 603 { 604 if (incrementedActiveOperations && decrementActiveOperations) 605 { 606 lock (this.activeOperationsLock) 607 { 608 this.activeOperations--; 609 } 610 } 611 612 this.executorLock.CleanupWaiter(lockToken, ref ownsLock); 613 } 614 } 615 else 616 { 617 return true; 618 } 619 } 620 OnLockAcquiredAsync(object state, Exception asyncException)621 static void OnLockAcquiredAsync(object state, Exception asyncException) 622 { 623 AcquireLockAsyncData data = (AcquireLockAsyncData)state; 624 625 lock (data.Instance.activeOperationsLock) 626 { 627 data.Instance.activeOperations--; 628 } 629 630 data.Callback(data.State, asyncException); 631 } 632 SetupIdleWaiter(ref bool ownsLock)633 AsyncWaitHandle SetupIdleWaiter(ref bool ownsLock) 634 { 635 AsyncWaitHandle idleEvent = new AsyncWaitHandle(EventResetMode.ManualReset); 636 637 lock (this.activeOperationsLock) 638 { 639 if (this.idleWaiters == null) 640 { 641 this.idleWaiters = new List<AsyncWaitHandle>(); 642 } 643 644 this.idleWaiters.Add(idleEvent); 645 } 646 647 ReleaseLock(ref ownsLock); 648 649 return idleEvent; 650 } 651 CleanupIdleWaiter(AsyncWaitHandle idleEvent, Exception waitException, ref bool ownsLock)652 bool CleanupIdleWaiter(AsyncWaitHandle idleEvent, Exception waitException, ref bool ownsLock) 653 { 654 lock (this.activeOperationsLock) 655 { 656 if (!this.idleWaiters.Remove(idleEvent)) 657 { 658 // If it wasn't in the list that means we raced between throwing from Wait 659 // and setting the event. This thread now is responsible for the lock. 660 if (waitException is TimeoutException) 661 { 662 // In the case of Timeout we let setting the event win and signal to 663 // swallow the exception 664 665 ownsLock = true; 666 return false; 667 } 668 } 669 } 670 671 return true; 672 } 673 674 // Called with the executor lock 675 // Returns true if someone was notified (this thread no longer owns the lock) or false if 676 // no one was notified. NotifyNextIdleWaiter(ref bool ownsLock)677 bool NotifyNextIdleWaiter(ref bool ownsLock) 678 { 679 // If we are no longer active, flush all idle waiters (next + current) because we will 680 // not enter Idle state again. For Suspended, even we could ---- to unsuspend and become idle, 681 // the desirable behavior while suspending is to reject pending as well as new requests. 682 if (this.state != State.Active) 683 { 684 PrepareNextIdleWaiter(); 685 } 686 687 if (this.idleWaiters != null && this.idleWaiters.Count > 0) 688 { 689 // We need to be careful about setting this event because if there is an async 690 // waiter then this thread will be used for some execution. Therefore we shouldn't 691 // call set with the activeOperationsLock held. 692 AsyncWaitHandle idleEvent = null; 693 694 // We need to lock this because a waiter might have timed out (or thrown another exception) and 695 // could be trying to remove itself from the list without the executor lock. 696 lock (this.activeOperationsLock) 697 { 698 if (this.idleWaiters.Count > 0) 699 { 700 idleEvent = this.idleWaiters[0]; 701 this.idleWaiters.RemoveAt(0); 702 } 703 } 704 705 if (idleEvent != null) 706 { 707 idleEvent.Set(); 708 ownsLock = false; 709 return true; 710 } 711 } 712 713 return false; 714 } 715 PrepareNextIdleWaiter()716 void PrepareNextIdleWaiter() 717 { 718 if (this.nextIdleWaiters != null && this.nextIdleWaiters.Count > 0) 719 { 720 lock (this.activeOperationsLock) 721 { 722 if (this.idleWaiters == null) 723 { 724 this.idleWaiters = new List<AsyncWaitHandle>(); 725 } 726 727 for (int i = 0; i < this.nextIdleWaiters.Count; i++) 728 { 729 this.idleWaiters.Add(this.nextIdleWaiters[i]); 730 } 731 } 732 733 this.nextIdleWaiters.Clear(); 734 } 735 } 736 BeginAcquireLockOnIdle(TimeSpan timeout, ref bool ownsLock, AsyncCallback callback, object state)737 IAsyncResult BeginAcquireLockOnIdle(TimeSpan timeout, ref bool ownsLock, AsyncCallback callback, object state) 738 { 739 return new AcquireLockOnIdleAsyncResult(this, timeout, ref ownsLock, callback, state); 740 } 741 EndAcquireLockOnIdle(IAsyncResult result)742 void EndAcquireLockOnIdle(IAsyncResult result) 743 { 744 Fx.Assert(result.CompletedSynchronously, "This overload should only be called when completed synchronously."); 745 AcquireLockOnIdleAsyncResult.End(result); 746 } 747 EndAcquireLockOnIdle(IAsyncResult result, ref bool ownsLock)748 void EndAcquireLockOnIdle(IAsyncResult result, ref bool ownsLock) 749 { 750 Fx.Assert(!result.CompletedSynchronously, "This overload should only be called when completed asynchronously."); 751 AcquireLockOnIdleAsyncResult.End(result, ref ownsLock); 752 } 753 ReleaseLock(ref bool ownsLock)754 void ReleaseLock(ref bool ownsLock) 755 { 756 ReleaseLock(ref ownsLock, false); 757 } 758 ReleaseLock(ref bool ownsLock, bool hasBeenPersistedByIdlePolicy)759 void ReleaseLock(ref bool ownsLock, bool hasBeenPersistedByIdlePolicy) 760 { 761 // The hasBeenPersistedByIdlePolicy flag is only true when this is part of the idle policy initiated persist. 762 763 if (!ownsLock) 764 { 765 return; 766 } 767 768 Fx.Assert(!this.IsHandlerThread, "We never set ownsLock if we are on the handler thread and therefore should have shortcut out earlier."); 769 770 bool resumeScheduler = false; 771 772 bool needToSignalWorkflowServiceInstanceReadyWaitHandle = false; 773 lock (this.thisLock) 774 { 775 this.isWorkflowServiceInstanceReady = true; 776 if (this.workflowServiceInstanceReadyWaitHandle != null) 777 { 778 needToSignalWorkflowServiceInstanceReadyWaitHandle = true; 779 } 780 781 // Signal that workflow has made progress and this progress has not been persisted by idle policy, 782 // we need to supress the abort initiated by unload when TimeToPersist < TimeToUnload. 783 // If ReleaseLock is done by anyone other than idle policy persist, we mark the instance dirty. 784 // Conversely, if idle policy completed a persist, we mark the instance clean. 785 this.hasDataToPersist = !hasBeenPersistedByIdlePolicy; 786 } 787 788 if (needToSignalWorkflowServiceInstanceReadyWaitHandle) 789 { 790 this.workflowServiceInstanceReadyWaitHandle.Set(); 791 } 792 793 lock (this.activeOperationsLock) 794 { 795 // We don't check for completion here because we need to make sure we always 796 // drain the scheduler queue. Note that the OnIdle handler only raises events 797 // if the workflow is truly idle. Therefore, if we are completed but not idle 798 // then we won't raise the events. 799 // Terminate capitalizes on this by assuring that there is at least one more 800 // work item in the queue. This provides a simple mechanism for getting a 801 // scheduler thread to raise the completed event. 802 bool isRunnable = this.state == State.Active && this.isRunnable && !this.IsIdle; 803 if (isRunnable && this.activeOperations == 0) 804 { 805 ownsLock = false; 806 resumeScheduler = true; 807 } 808 else if ((this.IsIdle || this.state != State.Active) && NotifyNextIdleWaiter(ref ownsLock)) 809 { 810 } 811 else 812 { 813 // If we are runnable then we want to hang onto the lock if Exit finds no one waiting. 814 if (!this.executorLock.Exit(isRunnable, ref ownsLock)) 815 { 816 // No one was waiting, but we had activeOperations (otherwise we would not have gotten 817 // to this branch of the if). This means that we raced with a timeout and should resume 818 // the workflow's execution. If we don't resume execution we'll just hang ... no one 819 // has the lock, the workflow is ready to execute, but it is not. 820 Fx.Assert(this.activeOperations > 0, "We should always have active operations otherwise we should have taken a different code path."); 821 822 // We no longer "own" the lock because the scheduler has taken control 823 ownsLock = false; 824 825 resumeScheduler = true; 826 } 827 } 828 } 829 830 if (resumeScheduler) 831 { 832 IncrementBusyCount(); 833 this.persistenceContext.Bookmarks = null; 834 this.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowExecuting(true); 835 if (this.Controller.State == WorkflowInstanceState.Complete) 836 { 837 OnNotifyPaused(); 838 } 839 else 840 { 841 this.Controller.Run(); 842 843 } 844 } 845 } 846 BeginAbandon(Exception reason, TimeSpan timeout, AsyncCallback callback, object state)847 public IAsyncResult BeginAbandon(Exception reason, TimeSpan timeout, AsyncCallback callback, object state) 848 { 849 Fx.Assert(reason != null, "reason must not be null!"); 850 return BeginAbandon(reason, true, timeout, callback, state); 851 } 852 853 //used by UnloadPolicy when TimeToUnload > TimeToPersist to prevent an Abort tracking record. BeginAbandon(Exception reason, bool shouldTrackAbort, TimeSpan timeout, AsyncCallback callback, object state)854 IAsyncResult BeginAbandon(Exception reason, bool shouldTrackAbort, TimeSpan timeout, AsyncCallback callback, object state) 855 { 856 return AbandonAsyncResult.Create(this, reason, shouldTrackAbort, timeout, callback, state); 857 } 858 EndAbandon(IAsyncResult result)859 public void EndAbandon(IAsyncResult result) 860 { 861 AbandonAsyncResult.End(result); 862 } 863 BeginAbandonAndSuspend(Exception reason, TimeSpan timeout, AsyncCallback callback, object state)864 IAsyncResult BeginAbandonAndSuspend(Exception reason, TimeSpan timeout, AsyncCallback callback, object state) 865 { 866 Fx.Assert(reason != null, "reason must not be null!"); 867 return AbandonAndSuspendAsyncResult.Create(this, reason, timeout, callback, state); 868 } 869 EndAbandonAndSuspend(IAsyncResult result)870 void EndAbandonAndSuspend(IAsyncResult result) 871 { 872 AbandonAndSuspendAsyncResult.End(result); 873 } 874 AbortInstance(Exception reason, bool isWorkflowThread)875 void AbortInstance(Exception reason, bool isWorkflowThread) 876 { 877 AbortInstance(reason, isWorkflowThread, true); 878 } 879 AbortInstance(Exception reason, bool isWorkflowThread, bool shouldTrackAbort)880 void AbortInstance(Exception reason, bool isWorkflowThread, bool shouldTrackAbort) 881 { 882 bool completeSelf = false; 883 884 if (shouldTrackAbort) 885 { 886 FxTrace.Exception.AsWarning(reason); 887 } 888 889 FaultPendingRequests(reason); 890 891 AbortExtensions(); 892 893 try 894 { 895 if (this.creationContext != null && !this.creationContextAborted) 896 { 897 this.creationContextAborted = true; 898 this.creationContext.OnAbort(); 899 } 900 901 if (isWorkflowThread) 902 { 903 completeSelf = true; 904 if (ValidateStateForAbort()) 905 { 906 this.state = State.Aborted; 907 if (shouldTrackAbort) 908 { 909 this.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowAborted(); 910 this.Controller.Abort(reason); 911 } 912 else 913 { 914 // this ensures that reason is null when WorkflowInstance.Abort is called 915 // and prevents an Abort tracking record. 916 this.Controller.Abort(); 917 } 918 DecrementBusyCount(); 919 920 // We should get off this thread because we're unsure of its state 921 ScheduleAbortTracking(false); 922 } 923 } 924 else 925 { 926 bool ownsLock = false; 927 928 try 929 { 930 if (AcquireLockAsync(this.acquireLockTimeout, true, false, ref ownsLock, new FastAsyncCallback(OnAbortLockAcquired), 931 new AbortInstanceState(reason, shouldTrackAbort))) 932 { 933 completeSelf = true; 934 if (ValidateStateForAbort()) 935 { 936 this.state = State.Aborted; 937 if (shouldTrackAbort) 938 { 939 this.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowAborted(); 940 this.Controller.Abort(reason); 941 } 942 else 943 { 944 // this ensures that reason is null when WorkflowInstance.Abort is called 945 // and prevents an Abort tracking record. 946 this.Controller.Abort(); 947 } 948 DecrementBusyCount(); 949 950 // We need to get off this thread so we don't block the caller 951 // of abort 952 ScheduleAbortTracking(false); 953 } 954 } 955 } 956 finally 957 { 958 if (completeSelf) 959 { 960 ReleaseLock(ref ownsLock); 961 } 962 } 963 } 964 } 965 finally 966 { 967 this.serviceHost.FaultServiceHostIfNecessary(reason); 968 } 969 } 970 AbortExtensions()971 void AbortExtensions() 972 { 973 this.abortingExtensions = true; 974 975 // Need to ensure that either components see the Aborted state, this method sees the components, or both. 976 Thread.MemoryBarrier(); 977 978 if (this.persistenceContext != null) 979 { 980 this.persistenceContext.Abort(); 981 } 982 983 PersistencePipeline currentPersistencePipeline = this.persistencePipelineInUse; 984 if (currentPersistencePipeline != null) 985 { 986 currentPersistencePipeline.Abort(); 987 } 988 989 // We abandon buffered Receives only in the complete code path, not in abort code path. 990 if (this.hasRaisedCompleted && this.bufferedReceiveManager != null) 991 { 992 this.bufferedReceiveManager.AbandonBufferedReceives(this.persistenceContext.AssociatedKeys); 993 } 994 995 } 996 Dispose()997 void Dispose() 998 { 999 this.DisposeExtensions(); 1000 1001 // We abandon buffered Receives only in the complete code path, not in abort code path. 1002 if (this.hasRaisedCompleted && this.bufferedReceiveManager != null) 1003 { 1004 this.bufferedReceiveManager.AbandonBufferedReceives(this.persistenceContext.AssociatedKeys); 1005 } 1006 } 1007 OnAbortLockAcquired(object state, Exception exception)1008 void OnAbortLockAcquired(object state, Exception exception) 1009 { 1010 if (exception != null) 1011 { 1012 // We ---- this exception because we were simply doing our 1013 // best to get the lock. Note that we won't proceed without 1014 // the lock because we may have already succeeded on another 1015 // thread. Technically this abort call has failed. 1016 1017 FxTrace.Exception.AsWarning(exception); 1018 return; 1019 } 1020 1021 bool ownsLock = true; 1022 bool shouldRaise = false; 1023 AbortInstanceState abortInstanceState = (AbortInstanceState)state; 1024 1025 try 1026 { 1027 if (ValidateStateForAbort()) 1028 { 1029 shouldRaise = true; 1030 this.state = State.Aborted; 1031 if (abortInstanceState.ShouldTrackAbort) 1032 { 1033 this.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowAborted(); 1034 this.Controller.Abort(abortInstanceState.Reason); 1035 } 1036 else 1037 { 1038 // this ensures that reason is null when WorkflowInstance.Abort is called 1039 // and prevents an Abort tracking record. 1040 this.Controller.Abort(); 1041 } 1042 DecrementBusyCount(); 1043 } 1044 } 1045 finally 1046 { 1047 ReleaseLock(ref ownsLock); 1048 } 1049 1050 if (shouldRaise) 1051 { 1052 // We call this from this thread because we've already 1053 // had a thread switch 1054 TrackAbort(false); 1055 } 1056 } 1057 ScheduleAbortTracking(bool isUpdateFailure)1058 void ScheduleAbortTracking(bool isUpdateFailure) 1059 { 1060 ActionItem.Schedule(new Action<object>(TrackAbort), isUpdateFailure); 1061 } 1062 1063 // This is only ever called from an appropriate thread (not the thread 1064 // that called abort unless it was an internal abort). 1065 // This method is called without the lock. We still provide single threaded 1066 // guarantees to the WorkflowInstance because: 1067 // * No other call can ever enter the executor again once the state has 1068 // switched to Aborted 1069 // * If this was an internal abort then the thread was fast pathing its 1070 // way out of the runtime and won't conflict 1071 // Or, in the case of a DynamicUpdate failure, the WorkflowInstance is 1072 // never returned from the factory method, and so will never be acessed by 1073 // another thread. TrackAbort(object state)1074 void TrackAbort(object state) 1075 { 1076 bool isUpdateFailure = (bool)state; 1077 1078 if (isUpdateFailure || this.Controller.HasPendingTrackingRecords) 1079 { 1080 try 1081 { 1082 IAsyncResult result = this.BeginFlushTrackingRecords(this.trackTimeout, Fx.ThunkCallback(new AsyncCallback(OnAbortTrackingComplete)), isUpdateFailure); 1083 1084 if (result.CompletedSynchronously) 1085 { 1086 this.Controller.EndFlushTrackingRecords(result); 1087 } 1088 else 1089 { 1090 return; 1091 } 1092 } 1093 catch (Exception e) 1094 { 1095 if (Fx.IsFatal(e)) 1096 { 1097 throw; 1098 } 1099 1100 // We ---- any exception here because we are on the abort path 1101 // and are doing a best effort to track this record. 1102 FxTrace.Exception.AsWarning(e); 1103 } 1104 } 1105 1106 if (!isUpdateFailure) 1107 { 1108 RaiseAborted(); 1109 } 1110 } 1111 OnAbortTrackingComplete(IAsyncResult result)1112 void OnAbortTrackingComplete(IAsyncResult result) 1113 { 1114 if (result.CompletedSynchronously) 1115 { 1116 return; 1117 } 1118 1119 bool isUpdateFailure = (bool)result.AsyncState; 1120 1121 try 1122 { 1123 this.EndFlushTrackingRecords(result); 1124 } 1125 catch (Exception e) 1126 { 1127 if (Fx.IsFatal(e)) 1128 { 1129 throw; 1130 } 1131 1132 // We ---- any exception here because we are on the abort path 1133 // and are doing a best effort to track this record. 1134 FxTrace.Exception.AsWarning(e); 1135 } 1136 1137 if (!isUpdateFailure) 1138 { 1139 RaiseAborted(); 1140 } 1141 } 1142 RaiseAborted()1143 void RaiseAborted() 1144 { 1145 this.UnloadInstancePolicy.Cancel(); 1146 CompletePendingOperations(); 1147 } 1148 BeginTerminate(string reason, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state)1149 public IAsyncResult BeginTerminate(string reason, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) 1150 { 1151 Fx.Assert(!String.IsNullOrEmpty(reason), "reason string must not be null or empty!"); 1152 1153 // the FaultException below is created using the FaultException(FaultReason, FaultCode) ctor instead of the FaultException(MessageFault) ctor 1154 // because the latter ctor saves the fault in its fault member. Saving the fault is problematic because faultException would serialize its 1155 // fault member and operationExecutionFault is not serializable. The faultException might need to be serialized if the workflowServiceInstance 1156 // is ever persisted since the faultException below ultimately becomes the terminationException saved with the workflowServiceInstance. 1157 OperationExecutionFault fault = OperationExecutionFault.CreateTerminatedFault(reason); 1158 return BeginTerminate(new FaultException(fault.Reason, fault.Code), transaction, timeout, callback, state); 1159 } 1160 BeginTerminate(Exception reason, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state)1161 IAsyncResult BeginTerminate(Exception reason, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) 1162 { 1163 Fx.Assert(reason != null, "reason must not be null!"); 1164 return TerminateAsyncResult.Create(this, reason, transaction, timeout, callback, state); 1165 } 1166 EndTerminate(IAsyncResult result)1167 public void EndTerminate(IAsyncResult result) 1168 { 1169 this.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowTerminated(); 1170 TerminateAsyncResult.End(result); 1171 } 1172 BeginCancel(Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state)1173 public IAsyncResult BeginCancel(Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) 1174 { 1175 return CancelAsyncResult.Create(this, transaction, timeout, callback, state); 1176 } 1177 EndCancel(IAsyncResult result)1178 public void EndCancel(IAsyncResult result) 1179 { 1180 CancelAsyncResult.End(result); 1181 } 1182 RunCore()1183 void RunCore() 1184 { 1185 this.isRunnable = true; 1186 this.state = State.Active; 1187 } 1188 BeginRun(Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state)1189 public IAsyncResult BeginRun(Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) 1190 { 1191 return BeginRun(transaction, null, timeout, callback, state); 1192 } 1193 BeginRun(Transaction transaction, string operationName, TimeSpan timeout, AsyncCallback callback, object state)1194 public IAsyncResult BeginRun(Transaction transaction, string operationName, TimeSpan timeout, AsyncCallback callback, object state) 1195 { 1196 return RunAsyncResult.Create(this, transaction, operationName, timeout, callback, state); 1197 } 1198 EndRun(IAsyncResult result)1199 public void EndRun(IAsyncResult result) 1200 { 1201 RunAsyncResult.End(result); 1202 } 1203 OnNotifyPaused()1204 protected override void OnNotifyPaused() 1205 { 1206 bool ownsLock = true; 1207 bool keepLock = false; 1208 1209 try 1210 { 1211 this.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowExecuting(false); 1212 if (ShouldRaiseComplete) 1213 { 1214 PrepareNextIdleWaiter(); 1215 1216 Exception abortException = null; 1217 1218 try 1219 { 1220 // We're about to notify the world that this instance is completed 1221 // so let's make it official. 1222 this.hasRaisedCompleted = true; 1223 this.state = State.Completed; 1224 GetCompletionState(); 1225 if (this.completionState == ActivityInstanceState.Closed) 1226 { 1227 this.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowCompleted(); 1228 } 1229 1230 if (this.Controller.HasPendingTrackingRecords) 1231 { 1232 IAsyncResult result = this.Controller.BeginFlushTrackingRecords(this.trackTimeout, TrackCompleteDoneCallback, this); 1233 1234 if (result.CompletedSynchronously) 1235 { 1236 this.Controller.EndFlushTrackingRecords(result); 1237 } 1238 else 1239 { 1240 keepLock = true; 1241 return; 1242 } 1243 } 1244 1245 this.handlerThreadId = Thread.CurrentThread.ManagedThreadId; 1246 1247 try 1248 { 1249 this.isInHandler = true; 1250 OnCompleted(); 1251 } 1252 finally 1253 { 1254 this.isInHandler = false; 1255 } 1256 } 1257 catch (Exception e) 1258 { 1259 if (Fx.IsFatal(e)) 1260 { 1261 throw; 1262 } 1263 1264 abortException = e; 1265 } 1266 1267 if (abortException != null) 1268 { 1269 AbortInstance(abortException, true); 1270 } 1271 } 1272 else if (this.Controller.State == WorkflowInstanceState.Aborted) 1273 { 1274 Exception abortReason = this.Controller.GetAbortReason(); 1275 this.AbortInstance(abortReason, true); 1276 } 1277 else if (ShouldRaiseIdle) 1278 { 1279 this.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowIdle(); 1280 1281 PrepareNextIdleWaiter(); 1282 1283 if (this.Controller.TrackingEnabled) 1284 { 1285 this.Controller.Track(new WorkflowInstanceRecord(this.Id, this.WorkflowDefinition.DisplayName, WorkflowInstanceStates.Idle, this.DefinitionIdentity)); 1286 IAsyncResult result = this.Controller.BeginFlushTrackingRecords(this.trackTimeout, TrackIdleDoneCallback, this); 1287 1288 if (result.CompletedSynchronously) 1289 { 1290 this.Controller.EndFlushTrackingRecords(result); 1291 } 1292 else 1293 { 1294 keepLock = true; 1295 return; 1296 } 1297 } 1298 1299 this.handlerThreadId = Thread.CurrentThread.ManagedThreadId; 1300 1301 try 1302 { 1303 this.isInHandler = true; 1304 OnIdle(); 1305 } 1306 finally 1307 { 1308 this.isInHandler = false; 1309 } 1310 } 1311 else 1312 { 1313 NotifyCheckCanPersistWaiters(ref ownsLock); 1314 } 1315 } 1316 finally 1317 { 1318 if (!keepLock) 1319 { 1320 ReleaseLock(ref ownsLock); 1321 } 1322 } 1323 } 1324 1325 // Note: this is runtime generated Abort such as Transaction failure OnRequestAbort(Exception reason)1326 protected override void OnRequestAbort(Exception reason) 1327 { 1328 AbortInstance(reason, false); 1329 } 1330 OnTrackCompleteDone(IAsyncResult result)1331 static void OnTrackCompleteDone(IAsyncResult result) 1332 { 1333 if (result.CompletedSynchronously) 1334 { 1335 return; 1336 } 1337 1338 WorkflowServiceInstance thisPtr = (WorkflowServiceInstance)result.AsyncState; 1339 bool ownsLock = true; 1340 1341 try 1342 { 1343 thisPtr.Controller.EndFlushTrackingRecords(result); 1344 1345 thisPtr.handlerThreadId = Thread.CurrentThread.ManagedThreadId; 1346 1347 try 1348 { 1349 thisPtr.isInHandler = true; 1350 thisPtr.OnCompleted(); 1351 } 1352 finally 1353 { 1354 thisPtr.isInHandler = false; 1355 } 1356 } 1357 finally 1358 { 1359 thisPtr.ReleaseLock(ref ownsLock); 1360 } 1361 } 1362 OnTrackIdleDone(IAsyncResult result)1363 static void OnTrackIdleDone(IAsyncResult result) 1364 { 1365 if (result.CompletedSynchronously) 1366 { 1367 return; 1368 } 1369 1370 WorkflowServiceInstance thisPtr = (WorkflowServiceInstance)result.AsyncState; 1371 bool ownsLock = true; 1372 1373 try 1374 { 1375 thisPtr.Controller.EndFlushTrackingRecords(result); 1376 1377 thisPtr.handlerThreadId = Thread.CurrentThread.ManagedThreadId; 1378 1379 try 1380 { 1381 thisPtr.isInHandler = true; 1382 thisPtr.OnIdle(); 1383 } 1384 finally 1385 { 1386 thisPtr.isInHandler = false; 1387 } 1388 } 1389 finally 1390 { 1391 thisPtr.ReleaseLock(ref ownsLock); 1392 } 1393 } 1394 OnNotifyUnhandledException(Exception exception, Activity exceptionSource, string exceptionSourceInstanceId)1395 protected override void OnNotifyUnhandledException(Exception exception, Activity exceptionSource, 1396 string exceptionSourceInstanceId) 1397 { 1398 bool ownsLock = true; 1399 bool keepLock = false; 1400 UnhandledExceptionAsyncData data = new UnhandledExceptionAsyncData(this, exception, exceptionSource); 1401 1402 try 1403 { 1404 if (this.Controller.HasPendingTrackingRecords) 1405 { 1406 IAsyncResult result = this.Controller.BeginFlushTrackingRecords(this.trackTimeout, TrackUnhandledExceptionDoneCallback, data); 1407 1408 if (result.CompletedSynchronously) 1409 { 1410 this.Controller.EndFlushTrackingRecords(result); 1411 } 1412 else 1413 { 1414 keepLock = true; 1415 return; 1416 } 1417 } 1418 1419 this.handlerThreadId = Thread.CurrentThread.ManagedThreadId; 1420 1421 try 1422 { 1423 this.isInHandler = true; 1424 OnUnhandledException(data); 1425 } 1426 finally 1427 { 1428 this.isInHandler = false; 1429 } 1430 } 1431 finally 1432 { 1433 if (!keepLock) 1434 { 1435 ReleaseLock(ref ownsLock); 1436 } 1437 } 1438 } 1439 OnTrackUnhandledExceptionDone(IAsyncResult result)1440 static void OnTrackUnhandledExceptionDone(IAsyncResult result) 1441 { 1442 if (result.CompletedSynchronously) 1443 { 1444 return; 1445 } 1446 1447 UnhandledExceptionAsyncData data = (UnhandledExceptionAsyncData)result.AsyncState; 1448 WorkflowServiceInstance thisPtr = data.Instance; 1449 1450 bool ownsLock = true; 1451 1452 try 1453 { 1454 thisPtr.Controller.EndFlushTrackingRecords(result); 1455 1456 thisPtr.handlerThreadId = Thread.CurrentThread.ManagedThreadId; 1457 1458 try 1459 { 1460 thisPtr.isInHandler = true; 1461 thisPtr.OnUnhandledException(data); 1462 } 1463 finally 1464 { 1465 thisPtr.isInHandler = false; 1466 } 1467 } 1468 finally 1469 { 1470 thisPtr.ReleaseLock(ref ownsLock); 1471 } 1472 } 1473 BeginSuspend(bool isUnlocked, string reason, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state)1474 public IAsyncResult BeginSuspend(bool isUnlocked, string reason, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) 1475 { 1476 return SuspendAsyncResult.Create(this, isUnlocked, reason, transaction, timeout, callback, state); 1477 } 1478 EndSuspend(IAsyncResult result)1479 public void EndSuspend(IAsyncResult result) 1480 { 1481 SuspendAsyncResult.End(result); 1482 } 1483 BeginUnsuspend(Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state)1484 public IAsyncResult BeginUnsuspend(Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) 1485 { 1486 return UnsuspendAsyncResult.Create(this, transaction, timeout, callback, state); 1487 } 1488 EndUnsuspend(IAsyncResult result)1489 public void EndUnsuspend(IAsyncResult result) 1490 { 1491 UnsuspendAsyncResult.End(result); 1492 } 1493 GetCompletionState()1494 void GetCompletionState() 1495 { 1496 this.completionState = this.Controller.GetCompletionState(out this.workflowOutputs, out this.terminationException); 1497 } 1498 TrackPersistence(PersistenceOperation operation)1499 void TrackPersistence(PersistenceOperation operation) 1500 { 1501 if (this.Controller.TrackingEnabled) 1502 { 1503 if (operation == PersistenceOperation.Delete) 1504 { 1505 this.Controller.Track(new WorkflowInstanceRecord(this.Id, this.WorkflowDefinition.DisplayName, WorkflowInstanceStates.Deleted, this.DefinitionIdentity)); 1506 } 1507 else if (operation == PersistenceOperation.Unload) 1508 { 1509 this.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowUnloaded(); 1510 this.Controller.Track(new WorkflowInstanceRecord(this.Id, this.WorkflowDefinition.DisplayName, WorkflowInstanceStates.Unloaded, this.DefinitionIdentity)); 1511 } 1512 else 1513 { 1514 this.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowPersisted(); 1515 this.Controller.Track(new WorkflowInstanceRecord(this.Id, this.WorkflowDefinition.DisplayName, WorkflowInstanceStates.Persisted, this.DefinitionIdentity)); 1516 } 1517 } 1518 } 1519 GeneratePersistenceData()1520 Dictionary<XName, InstanceValue> GeneratePersistenceData() 1521 { 1522 Dictionary<XName, InstanceValue> data = new Dictionary<XName, InstanceValue>(10); 1523 data[WorkflowNamespace.Bookmarks] = new InstanceValue(Controller.GetBookmarks(), InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional); 1524 data[WorkflowNamespace.LastUpdate] = new InstanceValue(DateTime.UtcNow, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional); 1525 1526 foreach (KeyValuePair<string, LocationInfo> mappedVariable in Controller.GetMappedVariables()) 1527 { 1528 data[WorkflowNamespace.VariablesPath.GetName(mappedVariable.Key)] = new InstanceValue(mappedVariable.Value, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional); 1529 } 1530 1531 Fx.AssertAndThrow(Controller.State != WorkflowInstanceState.Aborted, "Cannot generate data for an aborted service instance."); 1532 if (Controller.State != WorkflowInstanceState.Complete) 1533 { 1534 data[WorkflowNamespace.Workflow] = new InstanceValue(Controller.PrepareForSerialization()); 1535 1536 if (this.creationContext != null) 1537 { 1538 data[WorkflowServiceNamespace.CreationContext] = new InstanceValue(this.creationContext); 1539 } 1540 1541 data[WorkflowNamespace.Status] = new InstanceValue(Controller.State == WorkflowInstanceState.Idle ? "Idle" : "Executing", InstanceValueOptions.WriteOnly); 1542 } 1543 else 1544 { 1545 data[WorkflowNamespace.Workflow] = new InstanceValue(Controller.PrepareForSerialization(), InstanceValueOptions.Optional); 1546 1547 this.GetCompletionState(); 1548 1549 if (this.completionState == ActivityInstanceState.Faulted) 1550 { 1551 data[WorkflowNamespace.Status] = new InstanceValue("Faulted", InstanceValueOptions.WriteOnly); 1552 data[WorkflowNamespace.Exception] = new InstanceValue(this.terminationException, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional); 1553 } 1554 else if (this.completionState == ActivityInstanceState.Closed) 1555 { 1556 data[WorkflowNamespace.Status] = new InstanceValue("Closed", InstanceValueOptions.WriteOnly); 1557 if (this.workflowOutputs != null) 1558 { 1559 foreach (KeyValuePair<string, object> output in this.workflowOutputs) 1560 { 1561 data[WorkflowNamespace.OutputPath.GetName(output.Key)] = new InstanceValue(output.Value, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional); 1562 } 1563 } 1564 } 1565 else 1566 { 1567 Fx.AssertAndThrow(this.completionState == ActivityInstanceState.Canceled, "Cannot be executing a service instance when WorkflowState was completed."); 1568 data[WorkflowNamespace.Status] = new InstanceValue("Canceled", InstanceValueOptions.WriteOnly); 1569 } 1570 } 1571 return data; 1572 } 1573 BeginPersist(TimeSpan timeout, AsyncCallback callback, object state)1574 public IAsyncResult BeginPersist(TimeSpan timeout, AsyncCallback callback, object state) 1575 { 1576 return BeginPersist(false, timeout, callback, state); 1577 } 1578 BeginPersist(bool isTry, TimeSpan timeout, AsyncCallback callback, object state)1579 IAsyncResult BeginPersist(bool isTry, TimeSpan timeout, AsyncCallback callback, object state) 1580 { 1581 return new UnloadOrPersistAsyncResult(this, this.Controller.State == WorkflowInstanceState.Complete ? PersistenceOperation.Delete : PersistenceOperation.Save, false, isTry, 1582 timeout, callback, state); 1583 } 1584 EndPersist(IAsyncResult result)1585 public bool EndPersist(IAsyncResult result) 1586 { 1587 return UnloadOrPersistAsyncResult.End(result); 1588 } 1589 OnBeginFlushTrackingRecords(AsyncCallback callback, object state)1590 protected override IAsyncResult OnBeginFlushTrackingRecords(AsyncCallback callback, object state) 1591 { 1592 return this.Controller.BeginFlushTrackingRecords(this.trackTimeout, callback, state); 1593 } 1594 OnEndFlushTrackingRecords(IAsyncResult result)1595 protected override void OnEndFlushTrackingRecords(IAsyncResult result) 1596 { 1597 this.Controller.EndFlushTrackingRecords(result); 1598 } 1599 OnBeginPersist(AsyncCallback callback, object state)1600 protected override IAsyncResult OnBeginPersist(AsyncCallback callback, object state) 1601 { 1602 return new UnloadOrPersistAsyncResult(this, PersistenceOperation.Save, true, false, TimeSpan.MaxValue, callback, state); 1603 } 1604 OnEndPersist(IAsyncResult result)1605 protected override void OnEndPersist(IAsyncResult result) 1606 { 1607 UnloadOrPersistAsyncResult.End(result); 1608 } 1609 OnBeginAssociateKeys(ICollection<InstanceKey> keys, AsyncCallback callback, object state)1610 protected override IAsyncResult OnBeginAssociateKeys(ICollection<InstanceKey> keys, AsyncCallback callback, object state) 1611 { 1612 if (this.persistenceContext == null) 1613 { 1614 return new CompletedAsyncResult(callback, state); 1615 } 1616 else 1617 { 1618 return this.persistenceContext.BeginAssociateKeys(keys, this.persistTimeout, callback, state); 1619 } 1620 } 1621 OnEndAssociateKeys(IAsyncResult result)1622 protected override void OnEndAssociateKeys(IAsyncResult result) 1623 { 1624 if (this.persistenceContext == null) 1625 { 1626 CompletedAsyncResult.End(result); 1627 } 1628 else 1629 { 1630 this.persistenceContext.EndAssociateKeys(result); 1631 } 1632 } 1633 OnDisassociateKeys(ICollection<InstanceKey> keys)1634 protected override void OnDisassociateKeys(ICollection<InstanceKey> keys) 1635 { 1636 if (persistenceContext != null) 1637 { 1638 this.persistenceContext.DisassociateKeys(keys); 1639 } 1640 } 1641 ResumeProtocolBookmarkCore(Bookmark bookmark, object value, BookmarkScope bookmarkScope, bool bufferedReceiveEnabled, ref AsyncWaitHandle waitHandle, ref bool ownsLock)1642 BookmarkResumptionResult ResumeProtocolBookmarkCore(Bookmark bookmark, object value, BookmarkScope bookmarkScope, bool bufferedReceiveEnabled, ref AsyncWaitHandle waitHandle, ref bool ownsLock) 1643 { 1644 Fx.Assert(this.state == State.Active, "WorkflowServiceInstance.State should be State.Active at this point."); 1645 1646 BookmarkResumptionResult result; 1647 if (bookmarkScope == null) 1648 { 1649 result = this.Controller.ScheduleBookmarkResumption(bookmark, value); 1650 } 1651 else 1652 { 1653 result = this.Controller.ScheduleBookmarkResumption(bookmark, value, bookmarkScope); 1654 } 1655 1656 if (result == BookmarkResumptionResult.NotReady && !bufferedReceiveEnabled && (this.serviceHost.FilterResumeTimeout.TotalSeconds > 0)) 1657 { 1658 if (waitHandle == null) 1659 { 1660 waitHandle = new AsyncWaitHandle(); 1661 } 1662 else 1663 { 1664 waitHandle.Reset(); 1665 } 1666 1667 // Creation doesn't require the lock since it is guarded 1668 // by the executor lock. 1669 if (this.nextIdleWaiters == null) 1670 { 1671 this.nextIdleWaiters = new List<AsyncWaitHandle>(); 1672 } 1673 1674 lock (this.activeOperationsLock) 1675 { 1676 this.nextIdleWaiters.Add(waitHandle); 1677 } 1678 1679 // We release the lock here so that the workflow will continue to process 1680 // until the NextIdle waiters get notified 1681 ReleaseLock(ref ownsLock); 1682 } 1683 1684 return result; 1685 } 1686 1687 [Fx.Tag.Throws(typeof(TimeoutException), "Either the execution lock could not be acquired or the target sub-instance did not become stable in the allotted time.")] BeginResumeProtocolBookmark(Bookmark bookmark, BookmarkScope bookmarkScope, object value, TimeSpan timeout, AsyncCallback callback, object state)1688 public IAsyncResult BeginResumeProtocolBookmark(Bookmark bookmark, BookmarkScope bookmarkScope, object value, TimeSpan timeout, AsyncCallback callback, object state) 1689 { 1690 Fx.Assert(bookmark != null, "bookmark must not be null!"); 1691 1692 object bookmarkValue = value; 1693 WorkflowOperationContext context = value as WorkflowOperationContext; 1694 if (context != null) 1695 { 1696 if (!context.HasResponse) 1697 { 1698 lock (this.thisLock) 1699 { 1700 this.pendingRequests.Add(context); 1701 } 1702 } 1703 bookmarkValue = context.BookmarkValue; 1704 } 1705 1706 return new ResumeProtocolBookmarkAsyncResult(this, bookmark, bookmarkValue, bookmarkScope, true, timeout, callback, state); 1707 } 1708 1709 [Fx.Tag.InheritThrows(From = "ResumeProtocolBookmark")] EndResumeProtocolBookmark(IAsyncResult result)1710 public BookmarkResumptionResult EndResumeProtocolBookmark(IAsyncResult result) 1711 { 1712 return ResumeProtocolBookmarkAsyncResult.End(result); 1713 } 1714 OnBeginResumeBookmark(Bookmark bookmark, object value, TimeSpan timeout, AsyncCallback callback, object state)1715 protected override IAsyncResult OnBeginResumeBookmark(Bookmark bookmark, object value, TimeSpan timeout, AsyncCallback callback, object state) 1716 { 1717 return new ResumeProtocolBookmarkAsyncResult(this, bookmark, value, null, false, timeout, callback, state); 1718 } 1719 OnEndResumeBookmark(IAsyncResult result)1720 protected override BookmarkResumptionResult OnEndResumeBookmark(IAsyncResult result) 1721 { 1722 return ResumeProtocolBookmarkAsyncResult.End(result); 1723 } 1724 MarkUnloaded()1725 void MarkUnloaded() 1726 { 1727 this.state = State.Unloaded; 1728 1729 // don't abort completed instances 1730 if (this.Controller.State != WorkflowInstanceState.Complete) 1731 { 1732 this.Controller.Abort(); 1733 } 1734 1735 DecrementBusyCount(); 1736 } 1737 1738 // This always happens under executor lock AddCheckCanPersistWaiter(WaitForCanPersistAsyncResult result)1739 void AddCheckCanPersistWaiter(WaitForCanPersistAsyncResult result) 1740 { 1741 // Creation doesn't require the lock since it is guarded 1742 // by the executor lock. 1743 if (this.checkCanPersistWaiters == null) 1744 { 1745 this.checkCanPersistWaiters = new List<WaitForCanPersistAsyncResult>(); 1746 } 1747 this.checkCanPersistWaiters.Add(result); 1748 } 1749 1750 // This always happens under executor lock NotifyCheckCanPersistWaiters(ref bool ownsLock)1751 void NotifyCheckCanPersistWaiters(ref bool ownsLock) 1752 { 1753 // Always guarded by the executor lock. 1754 if (this.checkCanPersistWaiters != null && this.checkCanPersistWaiters.Count > 0 && this.Controller.IsPersistable) 1755 { 1756 List<WaitForCanPersistAsyncResult> waiters = this.checkCanPersistWaiters; 1757 this.checkCanPersistWaiters = null; 1758 foreach (WaitForCanPersistAsyncResult waiter in waiters) 1759 { 1760 waiter.SetEvent(ref ownsLock); 1761 } 1762 } 1763 } 1764 BeginWaitForCanPersist(ref bool ownsLock, TimeSpan timeout, AsyncCallback callback, object state)1765 IAsyncResult BeginWaitForCanPersist(ref bool ownsLock, TimeSpan timeout, AsyncCallback callback, object state) 1766 { 1767 return new WaitForCanPersistAsyncResult(this, ref ownsLock, timeout, callback, state); 1768 } 1769 EndWaitForCanPersist(IAsyncResult result, ref bool ownsLock)1770 void EndWaitForCanPersist(IAsyncResult result, ref bool ownsLock) 1771 { 1772 WaitForCanPersistAsyncResult.End(result, ref ownsLock); 1773 } 1774 ThrowIfAborted()1775 void ThrowIfAborted() 1776 { 1777 if (this.state == State.Aborted) 1778 { 1779 throw FxTrace.Exception.AsError(new FaultException(OperationExecutionFault.CreateAbortedFault(SR.WorkflowInstanceAborted(this.Id)))); 1780 } 1781 } 1782 ThrowIfTerminatedOrCompleted()1783 void ThrowIfTerminatedOrCompleted() 1784 { 1785 if (this.hasRaisedCompleted) 1786 { 1787 if (this.terminationException != null) 1788 { 1789 throw FxTrace.Exception.AsError(new FaultException(OperationExecutionFault.CreateTerminatedFault(SR.WorkflowInstanceTerminated(this.Id)))); 1790 } 1791 else 1792 { 1793 throw FxTrace.Exception.AsError(new FaultException(OperationExecutionFault.CreateCompletedFault(SR.WorkflowInstanceCompleted(this.Id)))); 1794 } 1795 } 1796 } 1797 ThrowIfUnloaded()1798 void ThrowIfUnloaded() 1799 { 1800 if (this.state == State.Unloaded) 1801 { 1802 throw FxTrace.Exception.AsError(new FaultException(OperationExecutionFault.CreateInstanceUnloadedFault(SR.WorkflowInstanceUnloaded(this.Id)))); 1803 } 1804 } 1805 ThrowIfSuspended()1806 void ThrowIfSuspended() 1807 { 1808 if (this.state == State.Suspended) 1809 { 1810 throw FxTrace.Exception.AsError(new InvalidOperationException(SR.InstanceMustNotBeSuspended)); 1811 } 1812 } 1813 ThrowIfNoPersistenceProvider()1814 void ThrowIfNoPersistenceProvider() 1815 { 1816 if (this.persistenceContext == null) 1817 { 1818 throw FxTrace.Exception.AsError(new InvalidOperationException(SR.PersistenceProviderRequiredToPersist)); 1819 } 1820 } 1821 ValidateStateForSuspend(Transaction transaction)1822 bool ValidateStateForSuspend(Transaction transaction) 1823 { 1824 // Note: we allow suspend even when suspended to update Suspended reason. 1825 1826 Validate(transaction == null ? XD2.WorkflowInstanceManagementService.Suspend : XD2.WorkflowInstanceManagementService.TransactedSuspend, transaction, true); 1827 1828 // WorkflowInstanceException validations 1829 ThrowIfAborted(); 1830 ThrowIfTerminatedOrCompleted(); 1831 ThrowIfUnloaded(); 1832 1833 return true; 1834 } 1835 ValidateStateForUnsuspend(Transaction transaction)1836 bool ValidateStateForUnsuspend(Transaction transaction) 1837 { 1838 if (this.state == State.Active) 1839 { 1840 return false; 1841 } 1842 1843 Validate(transaction == null ? XD2.WorkflowInstanceManagementService.Unsuspend : XD2.WorkflowInstanceManagementService.TransactedUnsuspend, transaction, true); 1844 1845 // WorkflowInstanceException validations 1846 ThrowIfAborted(); 1847 ThrowIfTerminatedOrCompleted(); 1848 ThrowIfUnloaded(); 1849 1850 return true; 1851 } 1852 ValidateStateForRun(Transaction transaction, string operationName)1853 bool ValidateStateForRun(Transaction transaction, string operationName) 1854 { 1855 if (this.hasRaisedCompleted || (this.state == State.Active && this.isRunnable) || this.isInTransaction) 1856 { 1857 return false; 1858 } 1859 1860 Validate(operationName ?? (transaction == null ? XD2.WorkflowInstanceManagementService.Run : XD2.WorkflowInstanceManagementService.TransactedRun), transaction, true); 1861 1862 // WorkflowInstanceException validations 1863 ThrowIfAborted(); 1864 ThrowIfUnloaded(); 1865 ThrowIfSuspended(); 1866 1867 return true; 1868 } 1869 ValidateStateForResumeProtocolBookmark()1870 void ValidateStateForResumeProtocolBookmark() 1871 { 1872 // WorkflowInstanceException validations 1873 ThrowIfAborted(); 1874 ThrowIfTerminatedOrCompleted(); 1875 ThrowIfUnloaded(); 1876 ThrowIfSuspended(); 1877 } 1878 ValidateStateForAssociateKeys()1879 void ValidateStateForAssociateKeys() 1880 { 1881 // WorkflowInstanceException validations 1882 ThrowIfSuspended(); 1883 } 1884 AreBookmarksInvalid(out BookmarkResumptionResult result)1885 bool AreBookmarksInvalid(out BookmarkResumptionResult result) 1886 { 1887 if (this.hasRaisedCompleted) 1888 { 1889 result = BookmarkResumptionResult.NotFound; 1890 return true; 1891 } 1892 else if (this.state == State.Unloaded || this.state == State.Aborted || this.state == State.Suspended) 1893 { 1894 result = BookmarkResumptionResult.NotReady; 1895 return true; 1896 } 1897 1898 result = BookmarkResumptionResult.Success; 1899 return false; 1900 } 1901 ValidateStateForAbort()1902 bool ValidateStateForAbort() 1903 { 1904 if (this.state == State.Aborted) 1905 { 1906 return false; 1907 } 1908 1909 return true; 1910 } 1911 ValidateStateForCancel(Transaction transaction)1912 bool ValidateStateForCancel(Transaction transaction) 1913 { 1914 if (this.hasRaisedCompleted) 1915 { 1916 return false; 1917 } 1918 1919 Validate(transaction == null ? XD2.WorkflowInstanceManagementService.Cancel : XD2.WorkflowInstanceManagementService.TransactedCancel, transaction, true); 1920 1921 // WorkflowInstanceException validations 1922 ThrowIfAborted(); 1923 ThrowIfUnloaded(); 1924 1925 return true; 1926 } 1927 ValidateStateForPersist()1928 void ValidateStateForPersist() 1929 { 1930 // WorkflowInstanceException validations 1931 ThrowIfAborted(); 1932 ThrowIfUnloaded(); 1933 1934 // Other validations 1935 ThrowIfNoPersistenceProvider(); 1936 } 1937 ValidateStateForUnload()1938 bool ValidateStateForUnload() 1939 { 1940 if (this.state == State.Unloaded) 1941 { 1942 return false; 1943 } 1944 1945 // WorkflowInstanceException validations 1946 ThrowIfAborted(); 1947 1948 // Other validations 1949 if (this.Controller.State != WorkflowInstanceState.Complete) 1950 { 1951 ThrowIfNoPersistenceProvider(); 1952 } 1953 1954 return true; 1955 } 1956 ValidateStateForTerminate(Transaction transaction)1957 bool ValidateStateForTerminate(Transaction transaction) 1958 { 1959 Validate(transaction == null ? XD2.WorkflowInstanceManagementService.Terminate : XD2.WorkflowInstanceManagementService.TransactedTerminate, transaction, true); 1960 1961 // WorkflowInstanceException validations 1962 ThrowIfAborted(); 1963 ThrowIfTerminatedOrCompleted(); 1964 ThrowIfUnloaded(); 1965 1966 return true; 1967 } 1968 InvokeCompletedCallback()1969 delegate void InvokeCompletedCallback(); 1970 1971 enum PersistenceOperation : byte 1972 { 1973 Delete, 1974 Save, 1975 Unload 1976 } 1977 1978 struct AcquireLockAsyncData 1979 { 1980 WorkflowServiceInstance instance; 1981 FastAsyncCallback callback; 1982 object state; 1983 AcquireLockAsyncDataSystem.ServiceModel.Activities.Dispatcher.WorkflowServiceInstance.AcquireLockAsyncData1984 public AcquireLockAsyncData(WorkflowServiceInstance instance, FastAsyncCallback callback, object state) 1985 { 1986 this.instance = instance; 1987 this.callback = callback; 1988 this.state = state; 1989 } 1990 1991 public WorkflowServiceInstance Instance 1992 { 1993 get 1994 { 1995 return instance; 1996 } 1997 } 1998 1999 public FastAsyncCallback Callback 2000 { 2001 get 2002 { 2003 return this.callback; 2004 } 2005 } 2006 2007 public object State 2008 { 2009 get 2010 { 2011 return this.state; 2012 } 2013 } 2014 } 2015 2016 class AbortInstanceState 2017 { AbortInstanceState(Exception reason, bool shouldTrackAbort)2018 public AbortInstanceState(Exception reason, bool shouldTrackAbort) 2019 { 2020 this.Reason = reason; 2021 this.ShouldTrackAbort = shouldTrackAbort; 2022 } 2023 2024 public Exception Reason 2025 { 2026 get; 2027 private set; 2028 } 2029 2030 public bool ShouldTrackAbort 2031 { 2032 get; 2033 private set; 2034 } 2035 } 2036 BeginTryAcquireReference(TimeSpan timeout, AsyncCallback callback, object state)2037 public IAsyncResult BeginTryAcquireReference(TimeSpan timeout, AsyncCallback callback, object state) 2038 { 2039 return new TryAcquireReferenceAsyncResult(this, timeout, callback, state); 2040 } 2041 EndTryAcquireReference(IAsyncResult result)2042 public bool EndTryAcquireReference(IAsyncResult result) 2043 { 2044 return TryAcquireReferenceAsyncResult.End(result); 2045 } 2046 BeginReleaseInstance(bool isTryUnload, TimeSpan timeout, AsyncCallback callback, object state)2047 public IAsyncResult BeginReleaseInstance(bool isTryUnload, TimeSpan timeout, AsyncCallback callback, object state) 2048 { 2049 return new ReleaseInstanceAsyncResult(this, isTryUnload, timeout, callback, state); 2050 } 2051 EndReleaseInstance(IAsyncResult result)2052 public void EndReleaseInstance(IAsyncResult result) 2053 { 2054 ReleaseInstanceAsyncResult.End(result); 2055 } 2056 EndReleaseInstanceForClose(IAsyncResult result)2057 public static void EndReleaseInstanceForClose(IAsyncResult result) 2058 { 2059 ReleaseInstanceAsyncResult.End(result); 2060 } 2061 BeginAssociateInfrastructureKeys(ICollection<InstanceKey> associatedKeys, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state)2062 public IAsyncResult BeginAssociateInfrastructureKeys(ICollection<InstanceKey> associatedKeys, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) 2063 { 2064 return new AssociateKeysAsyncResult(this, associatedKeys, transaction, timeout, callback, state); 2065 } 2066 EndAssociateInfrastructureKeys(IAsyncResult result)2067 public void EndAssociateInfrastructureKeys(IAsyncResult result) 2068 { 2069 AssociateKeysAsyncResult.End(result); 2070 } 2071 ReleaseContext(WorkflowOperationContext context)2072 public void ReleaseContext(WorkflowOperationContext context) 2073 { 2074 lock (this.thisLock) 2075 { 2076 this.pendingRequests.Remove(context); 2077 } 2078 } 2079 BeginWaitForPendingOperations(string sessionId, TimeSpan timeout, AsyncCallback callback, object state)2080 public IAsyncResult BeginWaitForPendingOperations(string sessionId, TimeSpan timeout, AsyncCallback callback, object state) 2081 { 2082 PendingOperationAsyncResult result = null; 2083 lock (this.thisLock) 2084 { 2085 if (this.pendingOperations == null) 2086 { 2087 this.pendingOperations = new Dictionary<string, List<PendingOperationAsyncResult>>(); 2088 } 2089 List<PendingOperationAsyncResult> pendingList; 2090 if (!this.pendingOperations.TryGetValue(sessionId, out pendingList)) 2091 { 2092 pendingList = new List<PendingOperationAsyncResult>(); 2093 this.pendingOperations.Add(sessionId, pendingList); 2094 } 2095 bool isFirstRequest = (pendingList.Count == 0); 2096 result = new PendingOperationAsyncResult(isFirstRequest, timeout, callback, state); 2097 pendingList.Add(result); 2098 ++this.pendingOperationCount; 2099 } 2100 result.Start(); 2101 return result; 2102 } 2103 EndWaitForPendingOperations(IAsyncResult result)2104 public void EndWaitForPendingOperations(IAsyncResult result) 2105 { 2106 PendingOperationAsyncResult.End(result); 2107 } 2108 2109 RemovePendingOperation(string sessionId, IAsyncResult result)2110 public void RemovePendingOperation(string sessionId, IAsyncResult result) 2111 { 2112 // remove the async result from the queue. The result could represent the operation currently being processed for the session 2113 // or could be an operation that had timed out waiting to get to the head of the queue. 2114 // Also, note that if the instance has already completed/aborted etc all pending operations would call OnWorkflowOperationCompleted 2115 // simultaneously and this.pendingOperations would be null. 2116 lock (this.thisLock) 2117 { 2118 List<PendingOperationAsyncResult> pendingList; 2119 if (this.pendingOperations != null && this.pendingOperations.TryGetValue(sessionId, out pendingList)) 2120 { 2121 if (pendingList.Count > 0) 2122 { 2123 // In the happy path, RemovePendingOperation might get called more than more than once(HandleEndResume & ProcessReply) 2124 // wasInProcess would be false the second time. When wasInProcess is false, we do not unblock the next item in the list 2125 bool wasInProcess = pendingList[0] == result; 2126 2127 if (pendingList.Remove((PendingOperationAsyncResult)result)) 2128 { 2129 --this.pendingOperationCount; 2130 } 2131 if (pendingList.Count == 0) 2132 { 2133 this.pendingOperations.Remove(sessionId); 2134 } 2135 // signal the next request to resume bookmark 2136 else if (wasInProcess) 2137 { 2138 pendingList[0].Unblock(); 2139 } 2140 } 2141 } 2142 } 2143 } 2144 CompletePendingOperations()2145 void CompletePendingOperations() 2146 { 2147 lock (this.thisLock) 2148 { 2149 if (this.pendingOperations != null) 2150 { 2151 foreach (List<PendingOperationAsyncResult> pendingList in this.pendingOperations.Values) 2152 { 2153 foreach (PendingOperationAsyncResult result in pendingList) 2154 { 2155 result.Unblock(); 2156 } 2157 } 2158 } 2159 this.pendingOperations = null; 2160 this.pendingOperationCount = 0; 2161 } 2162 } 2163 OnIdle()2164 void OnIdle() 2165 { 2166 if (this.BufferedReceiveManager != null) 2167 { 2168 this.persistenceContext.Bookmarks = this.Controller.GetBookmarks(); 2169 this.BufferedReceiveManager.Retry(this.persistenceContext.AssociatedKeys, this.persistenceContext.Bookmarks); 2170 } 2171 } 2172 OnCompleted()2173 void OnCompleted() 2174 { 2175 if (this.terminationException != null) 2176 { 2177 FaultPendingRequests(new FaultException(OperationExecutionFault.CreateTerminatedFault(SR.WorkflowInstanceTerminated(this.Id)))); 2178 } 2179 else 2180 { 2181 FaultPendingRequests(new FaultException(OperationExecutionFault.CreateCompletedFault(SR.WorkflowInstanceCompleted(this.Id)))); 2182 } 2183 2184 if (handleEndReleaseInstance == null) 2185 { 2186 handleEndReleaseInstance = Fx.ThunkCallback(new AsyncCallback(HandleEndReleaseInstance)); 2187 } 2188 IAsyncResult result = this.BeginReleaseInstance(false, TimeSpan.MaxValue, handleEndReleaseInstance, this); 2189 if (result.CompletedSynchronously) 2190 { 2191 OnReleaseInstance(result); 2192 } 2193 2194 CompletePendingOperations(); 2195 } 2196 HandleEndReleaseInstance(IAsyncResult result)2197 static void HandleEndReleaseInstance(IAsyncResult result) 2198 { 2199 if (result.CompletedSynchronously) 2200 { 2201 return; 2202 } 2203 2204 WorkflowServiceInstance thisPtr = (WorkflowServiceInstance)result.AsyncState; 2205 thisPtr.OnReleaseInstance(result); 2206 } 2207 OnReleaseInstance(IAsyncResult result)2208 void OnReleaseInstance(IAsyncResult result) 2209 { 2210 try 2211 { 2212 this.EndReleaseInstance(result); 2213 } 2214 catch (Exception e) 2215 { 2216 if (Fx.IsFatal(e)) 2217 { 2218 throw; 2219 } 2220 // 2221 this.AbortInstance(e, false); 2222 } 2223 } 2224 OnUnhandledException(UnhandledExceptionAsyncData data)2225 void OnUnhandledException(UnhandledExceptionAsyncData data) 2226 { 2227 Fx.Assert(data != null, "data must not be null!"); 2228 Fx.Assert(data.Exception != null, "data.Exception must not be null!"); 2229 2230 FaultPendingRequests(data.Exception); 2231 this.UnhandledExceptionPolicy.OnUnhandledException(data); 2232 } 2233 2234 // notify pending requests so that clients don't hang FaultPendingRequests(Exception e)2235 void FaultPendingRequests(Exception e) 2236 { 2237 WorkflowOperationContext[] requestsToFault = null; 2238 2239 lock (this.thisLock) 2240 { 2241 if (this.pendingRequests.Count == 0) 2242 { 2243 return; 2244 } 2245 2246 requestsToFault = this.pendingRequests.ToArray(); 2247 this.pendingRequests.Clear(); 2248 } 2249 2250 for (int i = 0; i < requestsToFault.Length; i++) 2251 { 2252 requestsToFault[i].SendFault(e); 2253 } 2254 } 2255 2256 //Attached Transaction outcome Signals from IEnlistmentNotification. TransactionCommitted()2257 public void TransactionCommitted() //Signal from TransactionContext on attached transaction commit. 2258 { 2259 if (this.TryAddReference()) 2260 { 2261 try 2262 { 2263 if ((this.state == State.Suspended && this.isTransactedCancelled) || this.state == State.Active) 2264 { 2265 bool ownsLock = false; 2266 // this could ---- with other commands and may throw exception. 2267 // treat it as best effort pulse of workflow. 2268 try 2269 { 2270 2271 AcquireLock(this.acquireLockTimeout, ref ownsLock); 2272 2273 if ((this.state == State.Suspended && this.isTransactedCancelled) || ValidateStateForRun(null, null)) 2274 { 2275 this.isRunnable = true; 2276 this.state = State.Active; 2277 } 2278 } 2279 catch (Exception exception) 2280 { 2281 if (Fx.IsFatal(exception)) 2282 { 2283 throw; 2284 } 2285 FxTrace.Exception.AsWarning(exception); 2286 } 2287 finally 2288 { 2289 ReleaseLock(ref ownsLock); 2290 } 2291 } 2292 // the workflow has completed thru transacted Terminate 2293 else if (this.state == State.Unloaded && this.completionState == ActivityInstanceState.Faulted) 2294 { 2295 try 2296 { 2297 OnCompleted(); 2298 } 2299 catch (Exception exception) 2300 { 2301 if (Fx.IsFatal(exception)) 2302 { 2303 throw; 2304 } 2305 2306 this.AbortInstance(exception, false); 2307 } 2308 } 2309 } 2310 finally 2311 { 2312 this.ReleaseReference(); 2313 } 2314 } 2315 } 2316 OnTransactionPrepared()2317 public void OnTransactionPrepared() 2318 { 2319 // Transaction has been prepared. 2320 // As far as WorkflowServiceInstance is concerned, no longer in transaction. 2321 this.transactionContext = null; 2322 this.isInTransaction = false; 2323 } 2324 OnTransactionAbortOrInDoubt(TransactionException exception)2325 public void OnTransactionAbortOrInDoubt(TransactionException exception) 2326 { 2327 Fx.Assert(exception != null, "Need a valid TransactionException to call this"); 2328 this.AbortInstance(exception, false); 2329 } 2330 2331 // Called under the lock. Validate(string operationName, Transaction ambientTransaction, bool controlEndpoint)2332 void Validate(string operationName, Transaction ambientTransaction, bool controlEndpoint) 2333 { 2334 ValidateHelper(operationName, ambientTransaction, false, controlEndpoint); 2335 } 2336 ValidateHelper(string operationName, Transaction ambientTransaction, bool useThreadTransaction, bool controlEndpoint)2337 void ValidateHelper(string operationName, Transaction ambientTransaction, bool useThreadTransaction, bool controlEndpoint) 2338 { 2339 TransactionContext attachedTransaction = this.transactionContext; 2340 2341 //Ensure Instance is usable. 2342 if (attachedTransaction != null && 2343 attachedTransaction.CurrentTransaction != (useThreadTransaction ? Transaction.Current : ambientTransaction)) 2344 { 2345 throw FxTrace.Exception.AsError(new FaultException( 2346 OperationExecutionFault.CreateTransactedLockException(this.persistenceContext.InstanceId, operationName))); 2347 } 2348 2349 if (controlEndpoint) 2350 { 2351 Fx.AssertAndThrow(this.state != State.Unloaded, "Cannot be unloaded"); 2352 } 2353 2354 if (this.state == State.Unloaded) 2355 { 2356 throw FxTrace.Exception.AsError(new FaultException( 2357 OperationExecutionFault.CreateInstanceUnloadedFault(SR.ServiceInstanceUnloaded(this.persistenceContext.InstanceId)))); 2358 } 2359 2360 //Do a fast check to fail fast. 2361 if (this.state == State.Completed || this.state == State.Aborted) 2362 { 2363 throw FxTrace.Exception.AsError(new FaultException( 2364 OperationExecutionFault.CreateInstanceNotFoundFault(SR.ServiceInstanceTerminated(this.persistenceContext.InstanceId)))); 2365 } 2366 2367 if (this.state == State.Suspended && 2368 !(operationName == XD2.WorkflowInstanceManagementService.Suspend 2369 || operationName == XD2.WorkflowInstanceManagementService.TransactedSuspend 2370 || operationName == XD2.WorkflowInstanceManagementService.Unsuspend 2371 || operationName == XD2.WorkflowInstanceManagementService.TransactedUnsuspend 2372 || operationName == XD2.WorkflowInstanceManagementService.Terminate 2373 || operationName == XD2.WorkflowInstanceManagementService.TransactedTerminate 2374 || operationName == XD2.WorkflowInstanceManagementService.Cancel 2375 || operationName == XD2.WorkflowInstanceManagementService.TransactedCancel)) 2376 { 2377 throw FxTrace.Exception.AsError(new FaultException( 2378 OperationExecutionFault.CreateSuspendedFault(this.Id, operationName))); 2379 } 2380 } 2381 //already done under the scope of a lock.No additional locking needed here DecrementBusyCount()2382 void DecrementBusyCount() 2383 { 2384 if (this.hasIncrementedBusyCount) 2385 { 2386 this.serviceHost.DecrementBusyCount(); 2387 if (AspNetEnvironment.Current.TraceDecrementBusyCountIsEnabled()) 2388 { 2389 AspNetEnvironment.Current.TraceDecrementBusyCount(SR.BusyCountTraceFormatString(this.Id)); 2390 } 2391 this.hasIncrementedBusyCount = false; 2392 } 2393 } 2394 //already done under the scope of a lock.No additional locking needed here IncrementBusyCount()2395 void IncrementBusyCount() 2396 { 2397 if (!this.hasIncrementedBusyCount) 2398 { 2399 this.serviceHost.IncrementBusyCount(); 2400 if (AspNetEnvironment.Current.TraceIncrementBusyCountIsEnabled()) 2401 { 2402 AspNetEnvironment.Current.TraceIncrementBusyCount(SR.BusyCountTraceFormatString(this.Id)); 2403 } 2404 this.hasIncrementedBusyCount = true; 2405 } 2406 } 2407 2408 2409 enum State 2410 { 2411 Active, //Default. 2412 Aborted, 2413 Suspended, 2414 Completed, 2415 Unloaded 2416 }; 2417 2418 class ReleaseInstanceAsyncResult : AsyncResult 2419 { 2420 static AsyncCompletion handleEndUnload; 2421 static Action<AsyncResult, Exception> onCompleting = new Action<AsyncResult, Exception>(Finally); 2422 static FastAsyncCallback lockAcquiredCallback = new FastAsyncCallback(OnLockAcquired); 2423 static FastAsyncCallback acquireCompletedCallback = new FastAsyncCallback(AcquireCompletedCallback); 2424 static AsyncCompletion onReleasePersistenceContext; 2425 static AsyncCompletion onClosePersistenceContext; 2426 WorkflowServiceInstance workflowInstance; 2427 TimeoutHelper timeoutHelper; 2428 bool isTryUnload; 2429 bool ownsLock; 2430 bool referenceAcquired; 2431 ReleaseInstanceAsyncResult(WorkflowServiceInstance workflowServiceInstance, bool isTryUnload, TimeSpan timeout, AsyncCallback callback, object state)2432 public ReleaseInstanceAsyncResult(WorkflowServiceInstance workflowServiceInstance, 2433 bool isTryUnload, TimeSpan timeout, AsyncCallback callback, object state) 2434 : base(callback, state) 2435 { 2436 this.workflowInstance = workflowServiceInstance; 2437 this.isTryUnload = isTryUnload; 2438 this.timeoutHelper = new TimeoutHelper(timeout); 2439 this.OnCompleting = onCompleting; 2440 2441 bool completeSelf = false; 2442 Exception completionException = null; 2443 try 2444 { 2445 completeSelf = TryAcquire(); 2446 } 2447 catch (Exception e) 2448 { 2449 if (Fx.IsFatal(e)) 2450 { 2451 throw; 2452 } 2453 2454 completionException = e; 2455 throw; 2456 } 2457 finally 2458 { 2459 if (completionException != null) 2460 { 2461 Finally(this, completionException); 2462 } 2463 } 2464 2465 if (completeSelf) 2466 { 2467 this.Complete(true); 2468 } 2469 } 2470 End(IAsyncResult result)2471 public static void End(IAsyncResult result) 2472 { 2473 AsyncResult.End<ReleaseInstanceAsyncResult>(result); 2474 } 2475 TryAcquire()2476 bool TryAcquire() 2477 { 2478 if (this.workflowInstance.acquireReferenceSemaphore.EnterAsync(timeoutHelper.RemainingTime(), acquireCompletedCallback, this)) 2479 { 2480 return this.HandleEndAcquireReference(); 2481 } 2482 else 2483 { 2484 return false; 2485 } 2486 } 2487 HandleEndAcquireReference()2488 bool HandleEndAcquireReference() 2489 { 2490 this.referenceAcquired = true; 2491 if (this.workflowInstance.hasPersistedDeleted) 2492 { 2493 return this.LockAndReleasePersistenceContext(); 2494 } 2495 else 2496 { 2497 return this.ReleaseInstance(); 2498 } 2499 } 2500 AcquireCompletedCallback(object state, Exception completionException)2501 static void AcquireCompletedCallback(object state, Exception completionException) 2502 { 2503 ReleaseInstanceAsyncResult thisPtr = (ReleaseInstanceAsyncResult)state; 2504 2505 bool completeSelf = true; 2506 if (completionException == null) 2507 { 2508 try 2509 { 2510 completeSelf = thisPtr.HandleEndAcquireReference(); 2511 } 2512 catch (Exception e) 2513 { 2514 if (Fx.IsFatal(e)) 2515 { 2516 throw; 2517 } 2518 completionException = e; 2519 } 2520 } 2521 2522 if (completeSelf) 2523 { 2524 thisPtr.Complete(false, completionException); 2525 } 2526 } 2527 ReleaseInstance()2528 bool ReleaseInstance() 2529 { 2530 if (handleEndUnload == null) 2531 { 2532 handleEndUnload = new AsyncCompletion(HandleEndUnload); 2533 } 2534 2535 IAsyncResult result = null; 2536 try 2537 { 2538 if (this.isTryUnload) 2539 { 2540 result = this.BeginTryUnload(timeoutHelper.RemainingTime(), 2541 this.PrepareAsyncCompletion(handleEndUnload), this); 2542 } 2543 else 2544 { 2545 result = this.BeginUnload(timeoutHelper.RemainingTime(), 2546 this.PrepareAsyncCompletion(handleEndUnload), this); 2547 } 2548 } 2549 catch (FaultException exception) 2550 { 2551 if (OperationExecutionFault.IsAbortedFaultException(exception)) 2552 { 2553 FxTrace.Exception.AsWarning(exception); 2554 return true; 2555 } 2556 else 2557 { 2558 throw; 2559 } 2560 } 2561 2562 if (result.CompletedSynchronously) 2563 { 2564 return HandleEndUnload(result); 2565 } 2566 else 2567 { 2568 return false; 2569 } 2570 } 2571 BeginUnload(TimeSpan timeout, AsyncCallback callback, object state)2572 IAsyncResult BeginUnload(TimeSpan timeout, AsyncCallback callback, object state) 2573 { 2574 return new UnloadOrPersistAsyncResult(this.workflowInstance, PersistenceOperation.Unload, false, false, timeout, callback, state); 2575 } 2576 EndUnload(IAsyncResult result)2577 void EndUnload(IAsyncResult result) 2578 { 2579 UnloadOrPersistAsyncResult.End(result); 2580 } 2581 BeginTryUnload(TimeSpan timeout, AsyncCallback callback, object state)2582 IAsyncResult BeginTryUnload(TimeSpan timeout, AsyncCallback callback, object state) 2583 { 2584 return new UnloadOrPersistAsyncResult(this.workflowInstance, PersistenceOperation.Unload, false, true, timeout, callback, state); 2585 } 2586 EndTryUnload(IAsyncResult result)2587 bool EndTryUnload(IAsyncResult result) 2588 { 2589 return UnloadOrPersistAsyncResult.End(result); 2590 } 2591 HandleEndUnload(IAsyncResult result)2592 static bool HandleEndUnload(IAsyncResult result) 2593 { 2594 ReleaseInstanceAsyncResult thisPtr = (ReleaseInstanceAsyncResult)result.AsyncState; 2595 bool successfulUnload = false; 2596 try 2597 { 2598 if (thisPtr.isTryUnload) 2599 { 2600 // if EndTryUnload returns false, then we need to revert our changes 2601 successfulUnload = thisPtr.EndTryUnload(result); 2602 } 2603 else 2604 { 2605 thisPtr.EndUnload(result); 2606 successfulUnload = true; 2607 } 2608 } 2609 catch (FaultException exception) 2610 { 2611 if (OperationExecutionFault.IsAbortedFaultException(exception)) 2612 { 2613 FxTrace.Exception.AsWarning(exception); 2614 } 2615 else 2616 { 2617 throw; 2618 } 2619 } 2620 2621 if (successfulUnload) 2622 { 2623 return thisPtr.LockAndReleasePersistenceContext(); 2624 } 2625 else 2626 { 2627 return true; 2628 } 2629 } 2630 LockAndReleasePersistenceContext()2631 bool LockAndReleasePersistenceContext() 2632 { 2633 if (this.workflowInstance.AcquireLockAsync(this.timeoutHelper.RemainingTime(), ref this.ownsLock, lockAcquiredCallback, this)) 2634 { 2635 bool completeSelf = true; 2636 try 2637 { 2638 completeSelf = this.ReleasePersistenceContext(); 2639 } 2640 finally 2641 { 2642 if (completeSelf) 2643 { 2644 this.workflowInstance.ReleaseLock(ref this.ownsLock); 2645 } 2646 } 2647 return completeSelf; 2648 } 2649 else 2650 { 2651 return false; 2652 } 2653 } 2654 OnLockAcquired(object state, Exception asyncException)2655 static void OnLockAcquired(object state, Exception asyncException) 2656 { 2657 ReleaseInstanceAsyncResult thisPtr = (ReleaseInstanceAsyncResult)state; 2658 2659 if (asyncException != null) 2660 { 2661 thisPtr.Complete(false, asyncException); 2662 return; 2663 } 2664 2665 thisPtr.ownsLock = true; 2666 2667 bool completeSelf = true; 2668 Exception completionException = null; 2669 2670 try 2671 { 2672 completeSelf = thisPtr.ReleasePersistenceContext(); 2673 } 2674 catch (Exception exception) 2675 { 2676 if (Fx.IsFatal(exception)) 2677 { 2678 throw; 2679 } 2680 2681 completionException = exception; 2682 } 2683 finally 2684 { 2685 if (completeSelf) 2686 { 2687 thisPtr.workflowInstance.ReleaseLock(ref thisPtr.ownsLock); 2688 } 2689 } 2690 2691 if (completeSelf) 2692 { 2693 thisPtr.Complete(false, completionException); 2694 } 2695 } 2696 ReleasePersistenceContext()2697 bool ReleasePersistenceContext() 2698 { 2699 if (this.workflowInstance.persistenceContext.State != CommunicationState.Opened) 2700 { 2701 return true; 2702 } 2703 2704 if (onReleasePersistenceContext == null) 2705 { 2706 onReleasePersistenceContext = new AsyncCompletion(OnReleasePersistenceContext); 2707 } 2708 2709 IAsyncResult result = this.workflowInstance.persistenceContext.BeginRelease(this.workflowInstance.persistTimeout, 2710 PrepareAsyncCompletion(onReleasePersistenceContext), this); 2711 2712 return SyncContinue(result); 2713 } 2714 OnReleasePersistenceContext(IAsyncResult result)2715 static bool OnReleasePersistenceContext(IAsyncResult result) 2716 { 2717 ReleaseInstanceAsyncResult thisPtr = (ReleaseInstanceAsyncResult)result.AsyncState; 2718 thisPtr.workflowInstance.persistenceContext.EndRelease(result); 2719 if (onClosePersistenceContext == null) 2720 { 2721 onClosePersistenceContext = new AsyncCompletion(OnClosePersistenceContext); 2722 } 2723 2724 IAsyncResult closeResult = thisPtr.workflowInstance.persistenceContext.BeginClose(thisPtr.timeoutHelper.RemainingTime(), 2725 thisPtr.PrepareAsyncCompletion(onClosePersistenceContext), thisPtr); 2726 return thisPtr.SyncContinue(closeResult); 2727 } 2728 OnClosePersistenceContext(IAsyncResult result)2729 static bool OnClosePersistenceContext(IAsyncResult result) 2730 { 2731 ReleaseInstanceAsyncResult thisPtr = (ReleaseInstanceAsyncResult)result.AsyncState; 2732 thisPtr.workflowInstance.persistenceContext.EndClose(result); 2733 thisPtr.workflowInstance.Dispose(); 2734 return true; 2735 } 2736 Finally(AsyncResult result, Exception completionException)2737 static void Finally(AsyncResult result, Exception completionException) 2738 { 2739 ReleaseInstanceAsyncResult thisPtr = (ReleaseInstanceAsyncResult)result; 2740 try 2741 { 2742 try 2743 { 2744 if (completionException != null && !Fx.IsFatal(completionException)) 2745 { 2746 thisPtr.workflowInstance.AbortInstance(completionException, thisPtr.ownsLock); 2747 } 2748 } 2749 finally 2750 { 2751 if (thisPtr.ownsLock) 2752 { 2753 thisPtr.workflowInstance.ReleaseLock(ref thisPtr.ownsLock); 2754 } 2755 } 2756 } 2757 finally 2758 { 2759 if (thisPtr.referenceAcquired) 2760 { 2761 thisPtr.workflowInstance.acquireReferenceSemaphore.Exit(); 2762 thisPtr.referenceAcquired = false; 2763 } 2764 } 2765 } 2766 } 2767 2768 class TryAcquireReferenceAsyncResult : AsyncResult 2769 { 2770 static FastAsyncCallback acquireCompletedCallback = new FastAsyncCallback(AcquireCompletedCallback); 2771 WorkflowServiceInstance instance; 2772 TimeoutHelper timeoutHelper; 2773 bool result; 2774 TryAcquireReferenceAsyncResult(WorkflowServiceInstance instance, TimeSpan timeout, AsyncCallback callback, object state)2775 public TryAcquireReferenceAsyncResult(WorkflowServiceInstance instance, TimeSpan timeout, AsyncCallback callback, object state) 2776 : base(callback, state) 2777 { 2778 this.instance = instance; 2779 this.timeoutHelper = new TimeoutHelper(timeout); 2780 2781 if (TryAcquire()) 2782 { 2783 this.Complete(true); 2784 } 2785 } 2786 End(IAsyncResult result)2787 public static bool End(IAsyncResult result) 2788 { 2789 return AsyncResult.End<TryAcquireReferenceAsyncResult>(result).result; 2790 } 2791 TryAcquire()2792 bool TryAcquire() 2793 { 2794 if (this.instance.acquireReferenceSemaphore.EnterAsync(timeoutHelper.RemainingTime(), acquireCompletedCallback, this)) 2795 { 2796 this.HandleEndAcquireReference(); 2797 return true; 2798 } 2799 else 2800 { 2801 return false; 2802 } 2803 } 2804 HandleEndAcquireReference()2805 void HandleEndAcquireReference() 2806 { 2807 try 2808 { 2809 this.result = this.instance.TryAddReference(); 2810 } 2811 finally 2812 { 2813 this.instance.acquireReferenceSemaphore.Exit(); 2814 } 2815 } 2816 AcquireCompletedCallback(object state, Exception completionException)2817 static void AcquireCompletedCallback(object state, Exception completionException) 2818 { 2819 TryAcquireReferenceAsyncResult thisPtr = (TryAcquireReferenceAsyncResult)state; 2820 2821 if (completionException == null) 2822 { 2823 try 2824 { 2825 thisPtr.HandleEndAcquireReference(); 2826 } 2827 catch (Exception e) 2828 { 2829 if (Fx.IsFatal(e)) 2830 { 2831 throw; 2832 } 2833 completionException = e; 2834 } 2835 } 2836 2837 thisPtr.Complete(false, completionException); 2838 } 2839 } 2840 2841 class PendingOperationAsyncResult : AsyncResult 2842 { 2843 static Action<object, TimeoutException> handleEndWait = new Action<object, TimeoutException>(HandleEndWait); 2844 AsyncWaitHandle waitHandle; 2845 bool isFirstRequest; 2846 TimeSpan timeout; 2847 PendingOperationAsyncResult(bool isFirstRequest, TimeSpan timeout, AsyncCallback callback, object state)2848 public PendingOperationAsyncResult(bool isFirstRequest, TimeSpan timeout, AsyncCallback callback, object state) 2849 : base(callback, state) 2850 { 2851 this.isFirstRequest = isFirstRequest; 2852 this.timeout = timeout; 2853 2854 if (!this.isFirstRequest) 2855 { 2856 this.waitHandle = new AsyncWaitHandle(EventResetMode.ManualReset); 2857 } 2858 } 2859 Start()2860 public void Start() 2861 { 2862 if (this.isFirstRequest) 2863 { 2864 Complete(true); 2865 return; 2866 } 2867 2868 Fx.Assert(this.waitHandle != null, "waitHandle should not be null if the request is not the first"); 2869 if (this.waitHandle.WaitAsync(handleEndWait, this, this.timeout)) 2870 { 2871 Complete(true); 2872 } 2873 } 2874 HandleEndWait(object state, TimeoutException e)2875 static void HandleEndWait(object state, TimeoutException e) 2876 { 2877 PendingOperationAsyncResult thisPtr = (PendingOperationAsyncResult)state; 2878 thisPtr.Complete(false, e); 2879 } 2880 Unblock()2881 public void Unblock() 2882 { 2883 if (this.waitHandle != null) 2884 { 2885 this.waitHandle.Set(); 2886 } 2887 } 2888 End(IAsyncResult result)2889 public static void End(IAsyncResult result) 2890 { 2891 AsyncResult.End<PendingOperationAsyncResult>(result); 2892 } 2893 2894 } 2895 2896 class AssociateKeysAsyncResult : TransactedAsyncResult 2897 { 2898 static AsyncCompletion handleLockAcquired = new AsyncCompletion(HandleLockAcquired); 2899 static AsyncCompletion handleAssociateInfrastructureKeys = new AsyncCompletion(HandleAssociateInfrastructureKeys); 2900 static Action<AsyncResult, Exception> onCompleting = new Action<AsyncResult, Exception>(Finally); 2901 2902 readonly WorkflowServiceInstance workflow; 2903 readonly ICollection<InstanceKey> associatedKeys; 2904 readonly TimeoutHelper timeoutHelper; 2905 readonly Transaction transaction; 2906 bool ownsLock; 2907 AssociateKeysAsyncResult(WorkflowServiceInstance workflow, ICollection<InstanceKey> associatedKeys, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state)2908 public AssociateKeysAsyncResult(WorkflowServiceInstance workflow, ICollection<InstanceKey> associatedKeys, Transaction transaction, 2909 TimeSpan timeout, AsyncCallback callback, object state) 2910 : base(callback, state) 2911 { 2912 Fx.Assert(associatedKeys != null && associatedKeys.Count > 0, "Must have associatedKeys!"); 2913 this.workflow = workflow; 2914 this.associatedKeys = associatedKeys; 2915 this.transaction = transaction; 2916 this.timeoutHelper = new TimeoutHelper(timeout); 2917 this.OnCompleting = onCompleting; 2918 2919 IAsyncResult result = this.workflow.BeginAcquireLockOnIdle(this.timeoutHelper.RemainingTime(), ref this.ownsLock, 2920 PrepareAsyncCompletion(handleLockAcquired), this); 2921 if (SyncContinue(result)) 2922 { 2923 Complete(true); 2924 } 2925 } 2926 End(IAsyncResult result)2927 public static void End(IAsyncResult result) 2928 { 2929 AsyncResult.End<AssociateKeysAsyncResult>(result); 2930 } 2931 HandleLockAcquired(IAsyncResult result)2932 static bool HandleLockAcquired(IAsyncResult result) 2933 { 2934 AssociateKeysAsyncResult thisPtr = (AssociateKeysAsyncResult)result.AsyncState; 2935 2936 if (result.CompletedSynchronously) 2937 { 2938 thisPtr.workflow.EndAcquireLockOnIdle(result); 2939 } 2940 else 2941 { 2942 thisPtr.workflow.EndAcquireLockOnIdle(result, ref thisPtr.ownsLock); 2943 } 2944 2945 thisPtr.workflow.ValidateStateForAssociateKeys(); 2946 return thisPtr.AssociateKeys(); 2947 } 2948 AssociateKeys()2949 bool AssociateKeys() 2950 { 2951 IAsyncResult result; 2952 using (PrepareTransactionalCall(this.transaction)) 2953 { 2954 result = this.workflow.persistenceContext.BeginAssociateInfrastructureKeys(this.associatedKeys, this.workflow.persistTimeout, 2955 PrepareAsyncCompletion(handleAssociateInfrastructureKeys), this); 2956 } 2957 return SyncContinue(result); 2958 } 2959 HandleAssociateInfrastructureKeys(IAsyncResult result)2960 static bool HandleAssociateInfrastructureKeys(IAsyncResult result) 2961 { 2962 AssociateKeysAsyncResult thisPtr = (AssociateKeysAsyncResult)result.AsyncState; 2963 thisPtr.workflow.persistenceContext.EndAssociateInfrastructureKeys(result); 2964 // Proper relase lock 2965 thisPtr.workflow.ReleaseLock(ref thisPtr.ownsLock); 2966 return true; 2967 } 2968 Finally(AsyncResult result, Exception completionException)2969 static void Finally(AsyncResult result, Exception completionException) 2970 { 2971 AssociateKeysAsyncResult thisPtr = (AssociateKeysAsyncResult)result; 2972 // Fallback for exception 2973 if (thisPtr.ownsLock) 2974 { 2975 thisPtr.workflow.ReleaseLock(ref thisPtr.ownsLock); 2976 } 2977 } 2978 } 2979 2980 class ResumeProtocolBookmarkAsyncResult : AsyncResult 2981 { 2982 static Action<object, TimeoutException> nextIdleCallback; 2983 static Action<object, TimeoutException> workflowServiceInstanceReadyCallback; 2984 2985 static Action<AsyncResult, Exception> onCompleting = new Action<AsyncResult, Exception>(Finally); 2986 static AsyncCompletion handleEndTrack = new AsyncCompletion(HandleEndTrack); 2987 static AsyncCompletion handleEndLockAcquired = new AsyncCompletion(HandleEndLockAcquired); 2988 static AsyncCompletion handleEndReferenceAcquired = new AsyncCompletion(HandleEndReferenceAcquired); 2989 2990 WorkflowServiceInstance instance; 2991 Bookmark bookmark; 2992 object value; 2993 BookmarkScope bookmarkScope; 2994 TimeoutHelper timeoutHelper; 2995 TimeoutHelper nextIdleTimeoutHelper; 2996 AsyncWaitHandle waitHandle; 2997 bool ownsLock; 2998 BookmarkResumptionResult resumptionResult; 2999 bool isResumeProtocolBookmark; 3000 bool referenceAcquired; 3001 ResumeProtocolBookmarkAsyncResult(WorkflowServiceInstance instance, Bookmark bookmark, object value, BookmarkScope bookmarkScope, bool isResumeProtocolBookmark, TimeSpan timeout, AsyncCallback callback, object state)3002 public ResumeProtocolBookmarkAsyncResult(WorkflowServiceInstance instance, Bookmark bookmark, object value, BookmarkScope bookmarkScope, bool isResumeProtocolBookmark, TimeSpan timeout, AsyncCallback callback, object state) 3003 : base(callback, state) 3004 { 3005 this.instance = instance; 3006 this.bookmark = bookmark; 3007 this.value = value; 3008 this.bookmarkScope = bookmarkScope; 3009 this.timeoutHelper = new TimeoutHelper(timeout); 3010 // The value for WorkflowServiceHost.FilterResumeTimeout comes from the AppSetting 3011 // "microsoft:WorkflowServices:FilterResumeTimeoutInSeconds" 3012 this.nextIdleTimeoutHelper = new TimeoutHelper(instance.serviceHost.FilterResumeTimeout); 3013 this.isResumeProtocolBookmark = isResumeProtocolBookmark; 3014 this.OnCompleting = onCompleting; 3015 3016 Exception completionException = null; 3017 bool completeSelf = true; 3018 3019 try 3020 { 3021 if (this.isResumeProtocolBookmark) 3022 { 3023 completeSelf = DoResumeBookmark(); 3024 } 3025 else 3026 { 3027 completeSelf = WaitForInstanceToBeReady(); 3028 } 3029 3030 } 3031 catch (Exception e) 3032 { 3033 if (Fx.IsFatal(e)) 3034 { 3035 throw; 3036 } 3037 3038 completionException = e; 3039 } 3040 3041 if (completeSelf) 3042 { 3043 this.Complete(true, completionException); 3044 } 3045 } 3046 DoResumeBookmark()3047 bool DoResumeBookmark() 3048 { 3049 IAsyncResult result = this.instance.BeginAcquireLockOnIdle(timeoutHelper.RemainingTime(), ref this.ownsLock, PrepareAsyncCompletion(handleEndLockAcquired), this); 3050 return SyncContinue(result); 3051 } 3052 WaitForInstanceToBeReady()3053 bool WaitForInstanceToBeReady() 3054 { 3055 IAsyncResult result = this.instance.BeginTryAcquireReference(timeoutHelper.RemainingTime(), PrepareAsyncCompletion(handleEndReferenceAcquired), this); 3056 return SyncContinue(result); 3057 } 3058 HandleEndReferenceAcquired(IAsyncResult result)3059 static bool HandleEndReferenceAcquired(IAsyncResult result) 3060 { 3061 ResumeProtocolBookmarkAsyncResult thisPtr = (ResumeProtocolBookmarkAsyncResult)result.AsyncState; 3062 thisPtr.referenceAcquired = thisPtr.instance.EndTryAcquireReference(result); 3063 if (thisPtr.referenceAcquired) 3064 { 3065 return thisPtr.WaitToBeSignaled(); 3066 } 3067 else 3068 { 3069 thisPtr.resumptionResult = BookmarkResumptionResult.NotReady; 3070 return true; 3071 } 3072 } 3073 WaitToBeSignaled()3074 bool WaitToBeSignaled() 3075 { 3076 bool needToWait = false; 3077 3078 lock (this.instance.thisLock) 3079 { 3080 if (!this.instance.isWorkflowServiceInstanceReady) 3081 { 3082 needToWait = true; 3083 if (this.instance.workflowServiceInstanceReadyWaitHandle == null) 3084 { 3085 this.instance.workflowServiceInstanceReadyWaitHandle = new AsyncWaitHandle(EventResetMode.ManualReset); 3086 } 3087 } 3088 } 3089 3090 if (needToWait) 3091 { 3092 if (workflowServiceInstanceReadyCallback == null) 3093 { 3094 workflowServiceInstanceReadyCallback = new Action<object, TimeoutException>(OnSignaled); 3095 } 3096 3097 if (this.instance.workflowServiceInstanceReadyWaitHandle.WaitAsync(workflowServiceInstanceReadyCallback, this, this.timeoutHelper.RemainingTime())) 3098 { 3099 return DoResumeBookmark(); 3100 } 3101 else 3102 { 3103 return false; 3104 } 3105 } 3106 else 3107 { 3108 return DoResumeBookmark(); 3109 } 3110 } 3111 OnSignaled(object state, TimeoutException exception)3112 static void OnSignaled(object state, TimeoutException exception) 3113 { 3114 ResumeProtocolBookmarkAsyncResult thisPtr = (ResumeProtocolBookmarkAsyncResult)state; 3115 if (exception != null) 3116 { 3117 thisPtr.Complete(false, exception); 3118 return; 3119 } 3120 3121 bool completeSelf = false; 3122 Exception completionException = null; 3123 3124 try 3125 { 3126 completeSelf = thisPtr.DoResumeBookmark(); 3127 } 3128 catch (Exception e) 3129 { 3130 if (Fx.IsFatal(e)) 3131 { 3132 throw; 3133 } 3134 completionException = e; 3135 } 3136 finally 3137 { 3138 if (completionException != null) 3139 { 3140 thisPtr.Complete(false, completionException); 3141 } 3142 } 3143 3144 if (completeSelf) 3145 { 3146 thisPtr.Complete(false); 3147 } 3148 } 3149 End(IAsyncResult result)3150 public static BookmarkResumptionResult End(IAsyncResult result) 3151 { 3152 ResumeProtocolBookmarkAsyncResult thisPtr = AsyncResult.End<ResumeProtocolBookmarkAsyncResult>(result); 3153 return thisPtr.resumptionResult; 3154 } 3155 HandleEndLockAcquired(IAsyncResult result)3156 static bool HandleEndLockAcquired(IAsyncResult result) 3157 { 3158 ResumeProtocolBookmarkAsyncResult thisPtr = (ResumeProtocolBookmarkAsyncResult)result.AsyncState; 3159 if (result.CompletedSynchronously) 3160 { 3161 thisPtr.instance.EndAcquireLockOnIdle(result); 3162 } 3163 else 3164 { 3165 thisPtr.instance.EndAcquireLockOnIdle(result, ref thisPtr.ownsLock); 3166 } 3167 return thisPtr.PerformResumption(); 3168 } 3169 PerformResumption()3170 bool PerformResumption() 3171 { 3172 // We always have the lock when entering this method 3173 3174 bool waitFinishedSynchronously; 3175 bool completeSelf = false; 3176 3177 // For ProtocolBookmark without Out-Of-Order messaging support, we will throw and 3178 // propagate Fault to client in case of invalid state (similar to management commands). 3179 // Otherwise, the result consistent with WorkflowApplication will be return and 3180 // the caller (eg. delay extension or OOM) needs to handle them accordingly. 3181 if (this.isResumeProtocolBookmark && this.instance.BufferedReceiveManager == null) 3182 { 3183 this.instance.ValidateStateForResumeProtocolBookmark(); 3184 } 3185 else 3186 { 3187 if (this.instance.AreBookmarksInvalid(out this.resumptionResult)) 3188 { 3189 return TrackPerformResumption(true); 3190 } 3191 } 3192 3193 do 3194 { 3195 waitFinishedSynchronously = false; 3196 3197 bool bufferedReceiveEnabled = this.isResumeProtocolBookmark && this.instance.BufferedReceiveManager != null; 3198 this.resumptionResult = this.instance.ResumeProtocolBookmarkCore(this.bookmark, this.value, this.bookmarkScope, bufferedReceiveEnabled, ref this.waitHandle, ref this.ownsLock); 3199 if (this.resumptionResult == BookmarkResumptionResult.NotReady && !bufferedReceiveEnabled && (this.instance.serviceHost.FilterResumeTimeout.TotalSeconds > 0)) 3200 { 3201 if (nextIdleCallback == null) 3202 { 3203 nextIdleCallback = new Action<object, TimeoutException>(OnNextIdle); 3204 } 3205 3206 if (this.waitHandle.WaitAsync(nextIdleCallback, this, !this.isResumeProtocolBookmark ? this.timeoutHelper.RemainingTime() : this.nextIdleTimeoutHelper.RemainingTime())) 3207 { 3208 // We now have the lock 3209 this.ownsLock = true; 3210 3211 // We should retry the resumption synchronously 3212 waitFinishedSynchronously = true; 3213 } 3214 else 3215 { 3216 return false; 3217 } 3218 } 3219 else 3220 { 3221 completeSelf = true; 3222 break; 3223 } 3224 3225 } 3226 while (waitFinishedSynchronously); 3227 3228 return TrackPerformResumption(completeSelf); 3229 } 3230 TrackPerformResumption(bool completeSelf)3231 bool TrackPerformResumption(bool completeSelf) 3232 { 3233 if (this.instance.Controller.HasPendingTrackingRecords) 3234 { 3235 Fx.Assert(completeSelf, "CompleteSelf should be true at this point."); 3236 3237 IAsyncResult result = this.instance.Controller.BeginFlushTrackingRecords(this.instance.trackTimeout, PrepareAsyncCompletion(handleEndTrack), this); 3238 completeSelf = SyncContinue(result); 3239 } 3240 3241 return completeSelf; 3242 } 3243 HandleEndTrack(IAsyncResult result)3244 static bool HandleEndTrack(IAsyncResult result) 3245 { 3246 ResumeProtocolBookmarkAsyncResult thisPtr = (ResumeProtocolBookmarkAsyncResult)result.AsyncState; 3247 thisPtr.instance.Controller.EndFlushTrackingRecords(result); 3248 3249 if (thisPtr.ownsLock) 3250 { 3251 thisPtr.instance.ReleaseLock(ref thisPtr.ownsLock); 3252 } 3253 if (thisPtr.referenceAcquired) 3254 { 3255 thisPtr.instance.ReleaseReference(); 3256 thisPtr.referenceAcquired = false; 3257 } 3258 return true; 3259 } 3260 OnNextIdle(object state, TimeoutException asyncException)3261 static void OnNextIdle(object state, TimeoutException asyncException) 3262 { 3263 ResumeProtocolBookmarkAsyncResult thisPtr = (ResumeProtocolBookmarkAsyncResult)state; 3264 3265 if (asyncException != null) 3266 { 3267 lock (thisPtr.instance.activeOperationsLock) 3268 { 3269 // If the waitHandle is not in either of these lists then it must have 3270 // been removed by the Set() path - that means we've got the lock, so let's 3271 // just run with it (IE - swallow the exception). 3272 if (thisPtr.instance.nextIdleWaiters.Remove(thisPtr.waitHandle) || thisPtr.instance.idleWaiters.Remove(thisPtr.waitHandle)) 3273 { 3274 thisPtr.Complete(false, asyncException); 3275 return; 3276 } 3277 } 3278 } 3279 3280 thisPtr.ownsLock = true; 3281 3282 bool completeSelf = true; 3283 Exception completionException = null; 3284 3285 try 3286 { 3287 completeSelf = thisPtr.PerformResumption(); 3288 } 3289 catch (Exception e) 3290 { 3291 if (Fx.IsFatal(e)) 3292 { 3293 throw; 3294 } 3295 3296 completionException = e; 3297 } 3298 finally 3299 { 3300 if (completeSelf) 3301 { 3302 thisPtr.instance.ReleaseLock(ref thisPtr.ownsLock); 3303 } 3304 } 3305 3306 if (completeSelf) 3307 { 3308 thisPtr.Complete(false, completionException); 3309 } 3310 } 3311 Finally(AsyncResult result, Exception completionException)3312 static void Finally(AsyncResult result, Exception completionException) 3313 { 3314 ResumeProtocolBookmarkAsyncResult thisPtr = (ResumeProtocolBookmarkAsyncResult)result; 3315 try 3316 { 3317 if (thisPtr.ownsLock) 3318 { 3319 thisPtr.instance.ReleaseLock(ref thisPtr.ownsLock); 3320 } 3321 } 3322 finally 3323 { 3324 if (thisPtr.referenceAcquired) 3325 { 3326 thisPtr.instance.ReleaseReference(); 3327 thisPtr.referenceAcquired = false; 3328 } 3329 } 3330 } 3331 } 3332 3333 class UnloadOrPersistAsyncResult : TransactedAsyncResult 3334 { 3335 static FastAsyncCallback lockAcquiredCallback = new FastAsyncCallback(OnLockAcquired); 3336 static AsyncCompletion persistedCallback = new AsyncCompletion(OnPersisted); 3337 static AsyncCompletion savedCallback = new AsyncCompletion(OnSaved); 3338 static AsyncCompletion waitForCanPersistCallback = new AsyncCompletion(OnWaitForCanPersist); 3339 static AsyncCompletion providerOpenedCallback = new AsyncCompletion(OnProviderOpened); 3340 static AsyncCompletion outermostCallback = new AsyncCompletion(OutermostCallback); 3341 static AsyncCompletion trackingCompleteCallback = new AsyncCompletion(OnTrackingComplete); 3342 static AsyncCompletion completeContextCallback = new AsyncCompletion(OnCompleteContext); 3343 static AsyncCompletion notifyCompletionCallback = new AsyncCompletion(OnNotifyCompletion); 3344 static Action<AsyncResult, Exception> completeCallback = new Action<AsyncResult, Exception>(OnComplete); 3345 3346 WorkflowServiceInstance instance; 3347 bool isUnloaded; 3348 SaveStatus saveStatus; 3349 TimeoutHelper timeoutHelper; 3350 PersistenceOperation operation; 3351 WorkflowPersistenceContext context; 3352 AsyncCompletion nextInnerAsyncCompletion; 3353 IDictionary<XName, InstanceValue> data; 3354 PersistencePipeline pipeline; 3355 bool ownsLock; 3356 bool isWorkflowThread; 3357 bool isTry; 3358 bool tryResult; 3359 bool updateState; 3360 bool isCompletionTransactionRequired; 3361 DependentTransaction dependentTransaction; 3362 bool isIdlePolicyPersist; 3363 long startTime; 3364 UnloadOrPersistAsyncResult(WorkflowServiceInstance instance, PersistenceOperation operation, bool isWorkflowThread, bool isTry, TimeSpan timeout, AsyncCallback callback, object state)3365 public UnloadOrPersistAsyncResult(WorkflowServiceInstance instance, PersistenceOperation operation, 3366 bool isWorkflowThread, bool isTry, TimeSpan timeout, AsyncCallback callback, object state) 3367 : base(callback, state) 3368 { 3369 // The isTry flag is only true when this is an idle policy initiated persist/unload. 3370 3371 Fx.Assert((isWorkflowThread && !isTry) || !isWorkflowThread, "Either we're the workflow thread and NOT a try or we're not a workflow thread."); 3372 3373 this.instance = instance; 3374 this.timeoutHelper = new TimeoutHelper(timeout); 3375 this.operation = operation; 3376 this.isWorkflowThread = isWorkflowThread; 3377 this.isTry = isTry; 3378 this.tryResult = true; 3379 this.isUnloaded = (operation == PersistenceOperation.Unload || operation == PersistenceOperation.Delete); 3380 this.saveStatus = SaveStatus.Locked; 3381 this.isCompletionTransactionRequired = this.isUnloaded && instance.Controller.State == WorkflowInstanceState.Complete && 3382 instance.creationContext != null && instance.creationContext.IsCompletionTransactionRequired; 3383 this.isIdlePolicyPersist = isTry && operation == PersistenceOperation.Save; 3384 3385 if (operation == PersistenceOperation.Unload) 3386 { 3387 this.saveStatus = SaveStatus.Unlocked; 3388 } 3389 else if (operation == PersistenceOperation.Delete) 3390 { 3391 this.saveStatus = SaveStatus.Completed; 3392 } 3393 else if (operation == PersistenceOperation.Save) 3394 { 3395 SetStartTime(); 3396 } 3397 3398 // Save off the current transaction in case we have an async operation before we end up creating 3399 // the WorkflowPersistenceContext and create it on another thread. Do a simple clone here to prevent 3400 // the object referenced by Transaction.Current from disposing before we get around to referencing it 3401 // when we create the WorkflowPersistenceContext. 3402 // 3403 // This will throw TransactionAbortedException by design, if the transaction is already rolled back. 3404 Transaction currentTransaction = Transaction.Current; 3405 if (currentTransaction != null) 3406 { 3407 OnCompleting = UnloadOrPersistAsyncResult.completeCallback; 3408 this.dependentTransaction = currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete); 3409 } 3410 3411 bool completeSelf = true; 3412 bool success = false; 3413 try 3414 { 3415 if (this.isWorkflowThread) 3416 { 3417 Fx.Assert(this.instance.Controller.IsPersistable, "The runtime won't schedule this work item unless we've passed the guard"); 3418 3419 // We're an internal persistence on the workflow thread which means 3420 // that we are passed the guard already, we have the lock, and we know 3421 // we aren't detached. 3422 3423 completeSelf = OpenProvider(); 3424 } 3425 else 3426 { 3427 try 3428 { 3429 completeSelf = LockAndPassGuard(); 3430 } 3431 finally 3432 { 3433 if (completeSelf) 3434 { 3435 Fx.Assert(!this.isWorkflowThread, "We should never be calling ReleaseLock if this is the workflow thread."); 3436 3437 this.instance.ReleaseLock(ref this.ownsLock, this.isIdlePolicyPersist && this.tryResult); 3438 } 3439 } 3440 } 3441 success = true; 3442 } 3443 finally 3444 { 3445 if (!success) 3446 { 3447 if (this.dependentTransaction != null) 3448 { 3449 this.dependentTransaction.Complete(); 3450 } 3451 } 3452 } 3453 3454 if (completeSelf) 3455 { 3456 Complete(true); 3457 } 3458 } 3459 3460 [Fx.Tag.SecurityNote(Critical = "Critical because it accesses UnsafeNativeMethods.QueryPerformanceCounter.", 3461 Safe = "Safe because we only make the call if PartialTrustHelper.AppDomainFullyTrusted is true.")] 3462 [SecuritySafeCritical] SetStartTime()3463 void SetStartTime() 3464 { 3465 if (PartialTrustHelpers.AppDomainFullyTrusted && UnsafeNativeMethods.QueryPerformanceCounter(out this.startTime) == 0) 3466 { 3467 this.startTime = -1; 3468 } 3469 } 3470 LockAndPassGuard()3471 bool LockAndPassGuard() 3472 { 3473 if (this.instance.AcquireLockAsync(this.timeoutHelper.RemainingTime(), ref this.ownsLock, lockAcquiredCallback, this)) 3474 { 3475 return PassGuard(); 3476 } 3477 3478 return false; 3479 } 3480 PassGuard()3481 bool PassGuard() 3482 { 3483 if (this.operation == PersistenceOperation.Unload) 3484 { 3485 if (!this.instance.ValidateStateForUnload()) 3486 { 3487 return true; 3488 } 3489 } 3490 else 3491 { 3492 this.instance.ValidateStateForPersist(); 3493 } 3494 3495 if (this.instance.Controller.IsPersistable) 3496 { 3497 return OpenProvider(); 3498 } 3499 else 3500 { 3501 if (this.isTry) 3502 { 3503 this.tryResult = false; 3504 return true; 3505 } 3506 3507 IAsyncResult result = this.instance.BeginWaitForCanPersist(ref this.ownsLock, this.timeoutHelper.RemainingTime(), 3508 PrepareInnerAsyncCompletion(waitForCanPersistCallback), this); 3509 if (result.CompletedSynchronously) 3510 { 3511 return OnWaitForCanPersist(result); 3512 } 3513 else 3514 { 3515 return false; 3516 } 3517 } 3518 } 3519 3520 [Fx.Tag.SecurityNote(Critical = "Critical because it accesses UnsafeNativeMethods.QueryPerformanceCounter.", 3521 Safe = "Safe because we only make the call if PartialTrustHelper.AppDomainFullyTrusted is true.")] 3522 [SecuritySafeCritical] GetDuration()3523 long GetDuration() 3524 { 3525 long currentTime = 0; 3526 long duration = 0; 3527 3528 if (PartialTrustHelpers.AppDomainFullyTrusted && (this.startTime >= 0) && 3529 (UnsafeNativeMethods.QueryPerformanceCounter(out currentTime) != 0)) 3530 { 3531 duration = currentTime - this.startTime; 3532 } 3533 return duration; 3534 } 3535 OnLockAcquired(object state, Exception asyncException)3536 static void OnLockAcquired(object state, Exception asyncException) 3537 { 3538 UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)state; 3539 3540 if (asyncException != null) 3541 { 3542 // AcquireLock does not return an exception unless it doesn't have the lock 3543 thisPtr.Complete(false, asyncException); 3544 3545 return; 3546 } 3547 3548 thisPtr.ownsLock = true; 3549 3550 bool completeSelf = true; 3551 Exception completionException = null; 3552 3553 try 3554 { 3555 completeSelf = thisPtr.PassGuard(); 3556 } 3557 catch (Exception e) 3558 { 3559 if (Fx.IsFatal(e)) 3560 { 3561 throw; 3562 } 3563 3564 completionException = e; 3565 } 3566 finally 3567 { 3568 if (completeSelf) 3569 { 3570 Fx.Assert(!thisPtr.isWorkflowThread, "We should never be calling ReleaseLock if this is the workflow thread."); 3571 3572 thisPtr.instance.ReleaseLock(ref thisPtr.ownsLock, thisPtr.isIdlePolicyPersist && thisPtr.tryResult); 3573 } 3574 } 3575 3576 if (completeSelf) 3577 { 3578 thisPtr.Complete(false, completionException); 3579 } 3580 } 3581 OpenProvider()3582 bool OpenProvider() 3583 { 3584 if (this.operation == PersistenceOperation.Unload) 3585 { 3586 if (this.instance.state != State.Suspended && !this.instance.IsIdle) 3587 { 3588 if (this.isTry) 3589 { 3590 this.tryResult = false; 3591 return true; 3592 } 3593 // Force unload 3594 } 3595 3596 // Release the last referenceCount 3597 if (!this.instance.TryReleaseLastReference()) 3598 { 3599 if (this.isTry) 3600 { 3601 this.tryResult = false; 3602 return true; 3603 } 3604 // Force unload 3605 } 3606 } 3607 3608 // We finally have the lock and are passed the guard. Let's update our operation if this is an Unload. 3609 if (this.operation == PersistenceOperation.Unload && this.instance.Controller.State == WorkflowInstanceState.Complete) 3610 { 3611 this.operation = PersistenceOperation.Delete; 3612 } 3613 3614 bool completedSync = false; 3615 3616 if (this.instance.persistenceContext != null && this.instance.persistenceContext.State == CommunicationState.Created) 3617 { 3618 IAsyncResult result = this.instance.persistenceContext.BeginOpen(timeoutHelper.RemainingTime(), 3619 PrepareInnerAsyncCompletion(providerOpenedCallback), this); 3620 3621 if (result.CompletedSynchronously) 3622 { 3623 completedSync = OnProviderOpened(result); 3624 } 3625 } 3626 else 3627 { 3628 completedSync = Track(); 3629 } 3630 3631 return completedSync; 3632 } 3633 End(IAsyncResult result)3634 public static bool End(IAsyncResult result) 3635 { 3636 UnloadOrPersistAsyncResult thisPtr = AsyncResult.End<UnloadOrPersistAsyncResult>(result); 3637 3638 return thisPtr.tryResult; 3639 } 3640 OutermostCallback(IAsyncResult result)3641 static bool OutermostCallback(IAsyncResult result) 3642 { 3643 UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState; 3644 3645 bool completeSelf = true; 3646 AsyncCompletion innerCallback = thisPtr.GetNextInnerAsyncCompletion(); 3647 3648 try 3649 { 3650 completeSelf = innerCallback(result); 3651 } 3652 finally 3653 { 3654 // We're exiting either on purpose or because of an exception 3655 if (completeSelf) 3656 { 3657 if (thisPtr.updateState) 3658 { 3659 if (thisPtr.saveStatus != SaveStatus.Locked) 3660 { 3661 // Stop execution if we've given up the instance lock 3662 thisPtr.instance.isRunnable = false; 3663 } 3664 3665 if (thisPtr.isUnloaded) 3666 { 3667 thisPtr.instance.MarkUnloaded(); 3668 } 3669 if (thisPtr.isIdlePolicyPersist && thisPtr.tryResult) 3670 { 3671 thisPtr.instance.DecrementBusyCount(); 3672 } 3673 } 3674 3675 // We don't want to release the lock if we're the workflow thread 3676 if (!thisPtr.isWorkflowThread) 3677 { 3678 thisPtr.instance.ReleaseLock(ref thisPtr.ownsLock, thisPtr.isIdlePolicyPersist && thisPtr.tryResult); 3679 } 3680 } 3681 } 3682 3683 return completeSelf; 3684 } 3685 GetNextInnerAsyncCompletion()3686 AsyncCompletion GetNextInnerAsyncCompletion() 3687 { 3688 AsyncCompletion next = this.nextInnerAsyncCompletion; 3689 3690 Fx.Assert(this.nextInnerAsyncCompletion != null, "Must have had one if we are calling GetNext"); 3691 this.nextInnerAsyncCompletion = null; 3692 3693 return next; 3694 } 3695 PrepareInnerAsyncCompletion(AsyncCompletion innerCallback)3696 AsyncCallback PrepareInnerAsyncCompletion(AsyncCompletion innerCallback) 3697 { 3698 this.nextInnerAsyncCompletion = innerCallback; 3699 3700 return PrepareAsyncCompletion(outermostCallback); 3701 } 3702 OnWaitForCanPersist(IAsyncResult result)3703 static bool OnWaitForCanPersist(IAsyncResult result) 3704 { 3705 UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState; 3706 3707 thisPtr.instance.EndWaitForCanPersist(result, ref thisPtr.ownsLock); 3708 3709 return thisPtr.OpenProvider(); 3710 } 3711 OnProviderOpened(IAsyncResult result)3712 static bool OnProviderOpened(IAsyncResult result) 3713 { 3714 UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState; 3715 3716 thisPtr.instance.persistenceContext.EndOpen(result); 3717 3718 return thisPtr.Track(); 3719 } 3720 Track()3721 bool Track() 3722 { 3723 // Do the tracking before preparing in case the tracking data is being pushed into 3724 // an extension and persisted transactionally with the instance state. 3725 3726 if (this.instance.persistenceContext != null) 3727 { 3728 // We only track the persistence operation if we actually 3729 // are persisting (and not just hitting PersistenceParticipants) 3730 this.instance.TrackPersistence(this.operation); 3731 } 3732 3733 if (this.instance.Controller.HasPendingTrackingRecords) 3734 { 3735 IAsyncResult result = this.instance.Controller.BeginFlushTrackingRecords(this.instance.trackTimeout, PrepareInnerAsyncCompletion(trackingCompleteCallback), this); 3736 return SyncContinue(result); 3737 } 3738 else 3739 { 3740 return CollectAndMap(); 3741 } 3742 } 3743 OnTrackingComplete(IAsyncResult result)3744 static bool OnTrackingComplete(IAsyncResult result) 3745 { 3746 UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState; 3747 3748 thisPtr.instance.Controller.EndFlushTrackingRecords(result); 3749 3750 return thisPtr.CollectAndMap(); 3751 } 3752 CollectAndMap()3753 bool CollectAndMap() 3754 { 3755 // From this point forward we'll update the state unless we get a persistence exception 3756 this.updateState = true; 3757 3758 Dictionary<XName, InstanceValue> initialPersistenceData = this.instance.GeneratePersistenceData(); 3759 3760 bool success = false; 3761 try 3762 { 3763 List<IPersistencePipelineModule> modules = this.instance.PipelineModules; 3764 if (modules != null) 3765 { 3766 Fx.Assert(modules.Count > 0, "should only setup modules if we have some"); 3767 this.pipeline = new PersistencePipeline(modules, initialPersistenceData); 3768 this.pipeline.Collect(); 3769 this.pipeline.Map(); 3770 this.data = this.pipeline.Values; 3771 } 3772 else 3773 { 3774 this.data = initialPersistenceData; 3775 } 3776 success = true; 3777 } 3778 finally 3779 { 3780 if (!success && this.context != null) 3781 { 3782 this.context.Abort(); 3783 } 3784 } 3785 3786 if (this.instance.persistenceContext != null) 3787 { 3788 return Persist(); 3789 } 3790 else 3791 { 3792 return Save(); 3793 } 3794 } 3795 Persist()3796 bool Persist() 3797 { 3798 IAsyncResult result = null; 3799 try 3800 { 3801 if (this.operation == PersistenceOperation.Delete) 3802 { 3803 this.saveStatus = SaveStatus.Completed; 3804 } 3805 3806 if (this.context == null) 3807 { 3808 this.context = new WorkflowPersistenceContext(this.instance, (this.pipeline != null && this.pipeline.IsSaveTransactionRequired) || this.isCompletionTransactionRequired, 3809 this.dependentTransaction, this.instance.persistTimeout); 3810 } 3811 3812 using (PrepareTransactionalCall(this.context.PublicTransaction)) 3813 { 3814 result = this.instance.persistenceContext.BeginSave(this.data, this.saveStatus, this.instance.persistTimeout, PrepareInnerAsyncCompletion(persistedCallback), this); 3815 } 3816 } 3817 catch (InstancePersistenceException) 3818 { 3819 this.updateState = false; 3820 throw; 3821 } 3822 finally 3823 { 3824 if (result == null && this.context != null) 3825 { 3826 this.context.Abort(); 3827 } 3828 } 3829 3830 return SyncContinue(result); 3831 } 3832 OnPersisted(IAsyncResult result)3833 static bool OnPersisted(IAsyncResult result) 3834 { 3835 UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState; 3836 bool success = false; 3837 try 3838 { 3839 thisPtr.instance.persistenceContext.EndSave(result); 3840 success = true; 3841 } 3842 catch (InstancePersistenceException) 3843 { 3844 thisPtr.updateState = false; 3845 throw; 3846 } 3847 finally 3848 { 3849 if (!success) 3850 { 3851 thisPtr.context.Abort(); 3852 } 3853 } 3854 3855 return thisPtr.Save(); 3856 } 3857 Save()3858 bool Save() 3859 { 3860 if (this.pipeline != null) 3861 { 3862 IAsyncResult result = null; 3863 try 3864 { 3865 if (this.context == null) 3866 { 3867 this.context = new WorkflowPersistenceContext(this.instance, this.pipeline.IsSaveTransactionRequired || this.isCompletionTransactionRequired, 3868 this.dependentTransaction, this.instance.persistTimeout); 3869 } 3870 3871 this.instance.persistencePipelineInUse = this.pipeline; 3872 Thread.MemoryBarrier(); 3873 if (this.instance.abortingExtensions) 3874 { 3875 throw FxTrace.Exception.AsError(new OperationCanceledException(SR.DefaultAbortReason)); 3876 } 3877 3878 using (PrepareTransactionalCall(this.context.PublicTransaction)) 3879 { 3880 result = this.pipeline.BeginSave(this.timeoutHelper.RemainingTime(), PrepareInnerAsyncCompletion(savedCallback), this); 3881 } 3882 } 3883 finally 3884 { 3885 if (result == null) 3886 { 3887 this.instance.persistencePipelineInUse = null; 3888 if (this.context != null) 3889 { 3890 this.context.Abort(); 3891 } 3892 } 3893 } 3894 return SyncContinue(result); 3895 } 3896 else 3897 { 3898 return NotifyCompletion(); 3899 } 3900 } 3901 OnSaved(IAsyncResult result)3902 static bool OnSaved(IAsyncResult result) 3903 { 3904 UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState; 3905 3906 bool success = false; 3907 try 3908 { 3909 thisPtr.pipeline.EndSave(result); 3910 success = true; 3911 } 3912 finally 3913 { 3914 thisPtr.instance.persistencePipelineInUse = null; 3915 if (!success) 3916 { 3917 thisPtr.context.Abort(); 3918 } 3919 } 3920 3921 return thisPtr.NotifyCompletion(); 3922 } 3923 NotifyCompletion()3924 bool NotifyCompletion() 3925 { 3926 if (this.isUnloaded && this.instance.Controller.State == WorkflowInstanceState.Complete && this.instance.creationContext != null) 3927 { 3928 IAsyncResult result = null; 3929 try 3930 { 3931 if (this.context == null) 3932 { 3933 this.context = new WorkflowPersistenceContext(this.instance, this.isCompletionTransactionRequired, 3934 this.dependentTransaction, this.instance.persistTimeout); 3935 } 3936 3937 using (PrepareTransactionalCall(this.context.PublicTransaction)) 3938 { 3939 result = this.instance.creationContext.OnBeginWorkflowCompleted(this.instance.completionState, this.instance.workflowOutputs, this.instance.terminationException, 3940 this.timeoutHelper.RemainingTime(), PrepareInnerAsyncCompletion(notifyCompletionCallback), this); 3941 if (result == null) 3942 { 3943 throw FxTrace.Exception.AsError(new InvalidOperationException(SR.WorkflowCompletionAsyncResultCannotBeNull)); 3944 } 3945 } 3946 } 3947 finally 3948 { 3949 if (result == null && this.context != null) 3950 { 3951 this.context.Abort(); 3952 } 3953 } 3954 return SyncContinue(result); 3955 } 3956 else 3957 { 3958 return CompleteContext(); 3959 } 3960 } 3961 OnNotifyCompletion(IAsyncResult result)3962 static bool OnNotifyCompletion(IAsyncResult result) 3963 { 3964 UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState; 3965 3966 bool success = false; 3967 try 3968 { 3969 thisPtr.instance.creationContext.OnEndWorkflowCompleted(result); 3970 success = true; 3971 } 3972 finally 3973 { 3974 if (!success) 3975 { 3976 thisPtr.context.Abort(); 3977 } 3978 } 3979 3980 return thisPtr.CompleteContext(); 3981 } 3982 CompleteContext()3983 bool CompleteContext() 3984 { 3985 bool wentAsync = false; 3986 IAsyncResult completeResult = null; 3987 3988 // Computing Persist Duration. 3989 if (this.operation == PersistenceOperation.Save) 3990 { 3991 this.instance.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowPersistDuration(GetDuration()); 3992 } 3993 3994 if (this.context != null) 3995 { 3996 wentAsync = this.context.TryBeginComplete(this.PrepareInnerAsyncCompletion(completeContextCallback), this, out completeResult); 3997 } 3998 3999 // we have persisted deleted state. this is to address TransactedTerminate avoiding 4000 // multiple deleted persistence. 4001 this.instance.hasPersistedDeleted = this.operation == PersistenceOperation.Delete; 4002 4003 if (wentAsync) 4004 { 4005 Fx.Assert(completeResult != null, "We shouldn't have null here because we would have rethrown or gotten false for went async."); 4006 return SyncContinue(completeResult); 4007 } 4008 else 4009 { 4010 // We completed synchronously if we didn't get an async result out of 4011 // TryBeginComplete 4012 return true; 4013 } 4014 4015 } 4016 OnCompleteContext(IAsyncResult result)4017 static bool OnCompleteContext(IAsyncResult result) 4018 { 4019 UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState; 4020 thisPtr.context.EndComplete(result); 4021 return true; 4022 } 4023 OnComplete(AsyncResult result, Exception exception)4024 static void OnComplete(AsyncResult result, Exception exception) 4025 { 4026 UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result; 4027 if (thisPtr.dependentTransaction != null) 4028 { 4029 thisPtr.dependentTransaction.Complete(); 4030 } 4031 } 4032 } 4033 4034 abstract class SimpleOperationAsyncResult : AsyncResult 4035 { 4036 static FastAsyncCallback lockAcquiredCallback = new FastAsyncCallback(OnLockAcquired); 4037 static Action<AsyncResult, Exception> onCompleting = new Action<AsyncResult, Exception>(Finally); 4038 static AsyncCompletion handleEndPerformOperation; 4039 static AsyncCompletion handleEndTrack; 4040 4041 protected WorkflowServiceInstance instance; 4042 protected TimeoutHelper timeoutHelper; 4043 protected bool ownsLock; 4044 SimpleOperationAsyncResult(WorkflowServiceInstance instance, Transaction transaction, AsyncCallback callback, object state)4045 protected SimpleOperationAsyncResult(WorkflowServiceInstance instance, Transaction transaction, AsyncCallback callback, object state) 4046 : base(callback, state) 4047 { 4048 this.instance = instance; 4049 this.OperationTransaction = transaction; 4050 this.OnCompleting = onCompleting; 4051 } 4052 4053 protected WorkflowServiceInstance Instance 4054 { 4055 get 4056 { 4057 return this.instance; 4058 } 4059 } 4060 4061 protected Transaction OperationTransaction 4062 { 4063 get; 4064 private set; 4065 } 4066 4067 protected virtual bool IsSynchronousOperation 4068 { 4069 get 4070 { 4071 return true; 4072 } 4073 } 4074 Run(TimeSpan timeout)4075 protected void Run(TimeSpan timeout) 4076 { 4077 this.timeoutHelper = new TimeoutHelper(timeout); 4078 4079 Exception completionException = null; 4080 bool completeSelf = true; 4081 4082 if (this.instance.AcquireLockAsync(this.timeoutHelper.RemainingTime(), ref this.ownsLock, lockAcquiredCallback, this)) 4083 { 4084 try 4085 { 4086 completeSelf = HandleLockAcquired(); 4087 } 4088 catch (Exception exception) 4089 { 4090 if (Fx.IsFatal(exception)) 4091 { 4092 throw; 4093 } 4094 completionException = exception; 4095 } 4096 } 4097 else 4098 { 4099 completeSelf = false; 4100 } 4101 4102 if (completeSelf) 4103 { 4104 Complete(true, completionException); 4105 } 4106 } 4107 OnLockAcquired(object state, Exception asyncException)4108 static void OnLockAcquired(object state, Exception asyncException) 4109 { 4110 SimpleOperationAsyncResult thisPtr = (SimpleOperationAsyncResult)state; 4111 4112 if (asyncException != null) 4113 { 4114 thisPtr.Complete(false, asyncException); 4115 } 4116 else 4117 { 4118 thisPtr.ownsLock = true; 4119 4120 Exception completionException = null; 4121 bool completeSelf = true; 4122 4123 try 4124 { 4125 completeSelf = thisPtr.HandleLockAcquired(); 4126 } 4127 catch (Exception exception) 4128 { 4129 if (Fx.IsFatal(exception)) 4130 { 4131 throw; 4132 } 4133 completionException = exception; 4134 } 4135 4136 if (completeSelf) 4137 { 4138 thisPtr.Complete(false, completionException); 4139 } 4140 } 4141 } 4142 HandleLockAcquired()4143 bool HandleLockAcquired() 4144 { 4145 if (ValidateState()) 4146 { 4147 return AttachTransaction(); 4148 } 4149 else 4150 { 4151 return true; 4152 } 4153 } 4154 AttachTransaction()4155 bool AttachTransaction() 4156 { 4157 if (this.OperationTransaction != null && this.Instance.transactionContext == null) 4158 { 4159 this.Instance.transactionContext = new TransactionContext(this.Instance, this.OperationTransaction); 4160 this.Instance.isInTransaction = true; 4161 this.Instance.isRunnable = false; 4162 } 4163 4164 if (this.IsSynchronousOperation) 4165 { 4166 PerformOperation(); 4167 return Track(); 4168 } 4169 else 4170 { 4171 if (handleEndPerformOperation == null) 4172 { 4173 handleEndPerformOperation = new AsyncCompletion(HandleEndPerformOperation); 4174 } 4175 4176 IAsyncResult result = BeginPerformOperation(PrepareAsyncCompletion(handleEndPerformOperation), this); 4177 if (result.CompletedSynchronously) 4178 { 4179 return HandleEndPerformOperation(result); 4180 } 4181 else 4182 { 4183 return false; 4184 } 4185 } 4186 } 4187 HandleEndPerformOperation(IAsyncResult result)4188 static bool HandleEndPerformOperation(IAsyncResult result) 4189 { 4190 SimpleOperationAsyncResult thisPtr = (SimpleOperationAsyncResult)result.AsyncState; 4191 thisPtr.EndPerformOperation(result); 4192 return thisPtr.Track(); 4193 } 4194 Track()4195 bool Track() 4196 { 4197 // For aborted, the AbortInstance will handle tracking. 4198 if (this.instance.state != State.Aborted && this.instance.Controller.HasPendingTrackingRecords) 4199 { 4200 if (handleEndTrack == null) 4201 { 4202 handleEndTrack = new AsyncCompletion(HandleEndTrack); 4203 } 4204 4205 IAsyncResult result = this.instance.Controller.BeginFlushTrackingRecords(this.instance.trackTimeout, PrepareAsyncCompletion(handleEndTrack), this); 4206 if (result.CompletedSynchronously) 4207 { 4208 return HandleEndTrack(result); 4209 } 4210 else 4211 { 4212 return false; 4213 } 4214 } 4215 else 4216 { 4217 return ReleaseLock(); 4218 } 4219 } 4220 HandleEndTrack(IAsyncResult result)4221 static bool HandleEndTrack(IAsyncResult result) 4222 { 4223 SimpleOperationAsyncResult thisPtr = (SimpleOperationAsyncResult)result.AsyncState; 4224 thisPtr.instance.Controller.EndFlushTrackingRecords(result); 4225 return thisPtr.ReleaseLock(); 4226 } 4227 ReleaseLock()4228 bool ReleaseLock() 4229 { 4230 this.instance.ReleaseLock(ref this.ownsLock); 4231 PostOperation(); 4232 return true; 4233 } 4234 Finally(AsyncResult result, Exception completionException)4235 static void Finally(AsyncResult result, Exception completionException) 4236 { 4237 SimpleOperationAsyncResult thisPtr = (SimpleOperationAsyncResult)result; 4238 if (thisPtr.ownsLock) 4239 { 4240 thisPtr.instance.ReleaseLock(ref thisPtr.ownsLock); 4241 } 4242 } 4243 ValidateState()4244 protected abstract bool ValidateState(); PerformOperation()4245 protected abstract void PerformOperation(); BeginPerformOperation(AsyncCallback callback, object state)4246 protected virtual IAsyncResult BeginPerformOperation(AsyncCallback callback, object state) 4247 { 4248 throw Fx.AssertAndThrow("Should not reach here!"); 4249 } EndPerformOperation(IAsyncResult result)4250 protected virtual void EndPerformOperation(IAsyncResult result) 4251 { 4252 throw Fx.AssertAndThrow("Should not reach here!"); 4253 } PostOperation()4254 protected abstract void PostOperation(); 4255 } 4256 4257 class TerminateAsyncResult : SimpleOperationAsyncResult 4258 { 4259 Exception reason; 4260 TerminateAsyncResult(WorkflowServiceInstance instance, Exception reason, Transaction transaction, AsyncCallback callback, object state)4261 TerminateAsyncResult(WorkflowServiceInstance instance, Exception reason, Transaction transaction, AsyncCallback callback, object state) 4262 : base(instance, transaction, callback, state) 4263 { 4264 this.reason = reason; 4265 } 4266 Create(WorkflowServiceInstance instance, Exception reason, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state)4267 public static TerminateAsyncResult Create(WorkflowServiceInstance instance, Exception reason, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) 4268 { 4269 TerminateAsyncResult result = new TerminateAsyncResult(instance, reason, transaction, callback, state); 4270 result.Run(timeout); 4271 return result; 4272 } 4273 End(IAsyncResult result)4274 public static void End(IAsyncResult result) 4275 { 4276 AsyncResult.End<TerminateAsyncResult>(result); 4277 } 4278 ValidateState()4279 protected override bool ValidateState() 4280 { 4281 return this.Instance.ValidateStateForTerminate(this.OperationTransaction); 4282 } 4283 PerformOperation()4284 protected override void PerformOperation() 4285 { 4286 this.Instance.Controller.Terminate(reason); 4287 4288 // Reset suspended reason for Cancel and Terminate 4289 if (this.Instance.persistenceContext.IsSuspended) 4290 { 4291 this.Instance.persistenceContext.IsSuspended = false; 4292 this.Instance.persistenceContext.SuspendedReason = null; 4293 } 4294 4295 // For non-transacted, we used the normal pulse to complete/unload the workflow. 4296 if (!this.Instance.isInTransaction) 4297 { 4298 this.Instance.isRunnable = true; 4299 this.Instance.state = State.Active; 4300 } 4301 // For transacted, the unload will happen at Tx committed time. 4302 else 4303 { 4304 this.Instance.GetCompletionState(); 4305 } 4306 } 4307 PostOperation()4308 protected override void PostOperation() 4309 { 4310 this.Instance.CompletePendingOperations(); 4311 } 4312 } 4313 4314 class AbandonAsyncResult : SimpleOperationAsyncResult 4315 { 4316 Exception reason; 4317 4318 // The shouldTrackAbort flag is only false when idle policy has TimeToPersist < TimeToUnload. 4319 bool shouldTrackAbort; 4320 AbandonAsyncResult(WorkflowServiceInstance instance, Exception reason, bool shouldTrackAbort, AsyncCallback callback, object state)4321 AbandonAsyncResult(WorkflowServiceInstance instance, Exception reason, bool shouldTrackAbort, AsyncCallback callback, object state) 4322 : base(instance, null, callback, state) 4323 { 4324 this.reason = reason; 4325 this.shouldTrackAbort = shouldTrackAbort; 4326 } 4327 Create(WorkflowServiceInstance instance, Exception reason, bool shouldTrackAbort, TimeSpan timeout, AsyncCallback callback, object state)4328 public static AbandonAsyncResult Create(WorkflowServiceInstance instance, Exception reason, bool shouldTrackAbort, TimeSpan timeout, AsyncCallback callback, object state) 4329 { 4330 AbandonAsyncResult result = new AbandonAsyncResult(instance, reason, shouldTrackAbort, callback, state); 4331 result.Run(timeout); 4332 return result; 4333 } 4334 4335 protected override bool IsSynchronousOperation 4336 { 4337 get 4338 { 4339 // We go through the synchronous code path only when we want to terminate the unload. 4340 // We want to terminate the unload only when 4341 // TimeToPersist < TimeToUnload AND instance is dirty and waiting to be persisted by idle policy. 4342 4343 // The hasDataToPersist flag should only be read under the executor lock. 4344 if (!this.shouldTrackAbort && this.Instance.hasDataToPersist) 4345 { 4346 return true; 4347 } 4348 else 4349 { 4350 return false; 4351 } 4352 } 4353 } 4354 End(IAsyncResult result)4355 public static void End(IAsyncResult result) 4356 { 4357 AsyncResult.End<AbandonAsyncResult>(result); 4358 } 4359 ValidateState()4360 protected override bool ValidateState() 4361 { 4362 return this.Instance.ValidateStateForAbort(); 4363 } 4364 PerformOperation()4365 protected override void PerformOperation() 4366 { 4367 // This is the synchronous code path. This path terminates the unload and leaves the instance intact. 4368 Fx.Assert(!this.shouldTrackAbort && this.Instance.hasDataToPersist, "We should only get here when we need to terminate the unload."); 4369 4370 // Since reference count has already been decremented to 0 by now, we should set it back to 1. 4371 this.Instance.RecoverLastReference(); 4372 } 4373 BeginPerformOperation(AsyncCallback callback, object state)4374 protected override IAsyncResult BeginPerformOperation(AsyncCallback callback, object state) 4375 { 4376 try 4377 { 4378 return this.Instance.persistenceContext.BeginRelease(this.Instance.persistTimeout, callback, state); 4379 } 4380 catch (Exception exception) 4381 { 4382 if (Fx.IsFatal(exception)) 4383 { 4384 throw; 4385 } 4386 4387 this.Instance.AbortInstance(this.reason, true); 4388 throw; 4389 } 4390 } 4391 EndPerformOperation(IAsyncResult result)4392 protected override void EndPerformOperation(IAsyncResult result) 4393 { 4394 try 4395 { 4396 this.Instance.persistenceContext.EndRelease(result); 4397 if (!this.shouldTrackAbort && this.Instance.Controller.TrackingEnabled) 4398 { 4399 this.Instance.Controller.Track(new WorkflowInstanceRecord(this.Instance.Id, this.Instance.WorkflowDefinition.DisplayName, WorkflowInstanceStates.Unloaded, this.Instance.DefinitionIdentity)); 4400 } 4401 4402 if (!this.shouldTrackAbort) 4403 { 4404 this.instance.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowUnloaded(); 4405 } 4406 4407 this.Instance.AbortInstance(this.reason, true, this.shouldTrackAbort); 4408 } 4409 catch (Exception exception) 4410 { 4411 if (Fx.IsFatal(exception)) 4412 { 4413 throw; 4414 } 4415 4416 this.Instance.AbortInstance(this.reason, true); 4417 throw; 4418 } 4419 } 4420 PostOperation()4421 protected override void PostOperation() 4422 { 4423 } 4424 } 4425 4426 class AbandonAndSuspendAsyncResult : SimpleOperationAsyncResult 4427 { 4428 Exception reason; 4429 AbandonAndSuspendAsyncResult(WorkflowServiceInstance instance, Exception reason, AsyncCallback callback, object state)4430 AbandonAndSuspendAsyncResult(WorkflowServiceInstance instance, Exception reason, AsyncCallback callback, object state) 4431 : base(instance, null, callback, state) 4432 { 4433 this.reason = reason; 4434 } 4435 Create(WorkflowServiceInstance instance, Exception reason, TimeSpan timeout, AsyncCallback callback, object state)4436 public static AbandonAndSuspendAsyncResult Create(WorkflowServiceInstance instance, Exception reason, TimeSpan timeout, AsyncCallback callback, object state) 4437 { 4438 AbandonAndSuspendAsyncResult result = new AbandonAndSuspendAsyncResult(instance, reason, callback, state); 4439 result.Run(timeout); 4440 return result; 4441 } 4442 4443 protected override bool IsSynchronousOperation 4444 { 4445 get 4446 { 4447 return false; 4448 } 4449 } 4450 End(IAsyncResult result)4451 public static void End(IAsyncResult result) 4452 { 4453 AsyncResult.End<AbandonAndSuspendAsyncResult>(result); 4454 } 4455 ValidateState()4456 protected override bool ValidateState() 4457 { 4458 return this.Instance.ValidateStateForAbort(); 4459 } 4460 PerformOperation()4461 protected override void PerformOperation() 4462 { 4463 throw Fx.AssertAndThrow("Should not reach here!"); 4464 } 4465 BeginPerformOperation(AsyncCallback callback, object state)4466 protected override IAsyncResult BeginPerformOperation(AsyncCallback callback, object state) 4467 { 4468 try 4469 { 4470 return this.Instance.persistenceContext.BeginUpdateSuspendMetadata(this.reason, this.Instance.persistTimeout, callback, state); 4471 } 4472 catch (Exception exception) 4473 { 4474 if (Fx.IsFatal(exception)) 4475 { 4476 throw; 4477 } 4478 4479 this.Instance.AbortInstance(this.reason, true); 4480 throw; 4481 } 4482 } 4483 EndPerformOperation(IAsyncResult result)4484 protected override void EndPerformOperation(IAsyncResult result) 4485 { 4486 try 4487 { 4488 this.Instance.persistenceContext.EndUpdateSuspendMetadata(result); 4489 AbandonAndSuspendAsyncResult data = (AbandonAndSuspendAsyncResult)result.AsyncState; 4490 if (this.Instance.Controller.TrackingEnabled) 4491 { 4492 this.Instance.Controller.Track(new WorkflowInstanceSuspendedRecord(this.Instance.Id, this.Instance.WorkflowDefinition.DisplayName, data.reason.Message, this.Instance.DefinitionIdentity)); 4493 } 4494 4495 this.Instance.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowSuspended(); 4496 } 4497 finally 4498 { 4499 this.Instance.AbortInstance(this.reason, true); 4500 } 4501 } 4502 PostOperation()4503 protected override void PostOperation() 4504 { 4505 } 4506 } 4507 4508 class CancelAsyncResult : SimpleOperationAsyncResult 4509 { CancelAsyncResult(WorkflowServiceInstance instance, Transaction transaction, AsyncCallback callback, object state)4510 CancelAsyncResult(WorkflowServiceInstance instance, Transaction transaction, AsyncCallback callback, object state) 4511 : base(instance, transaction, callback, state) 4512 { 4513 } 4514 Create(WorkflowServiceInstance instance, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state)4515 public static CancelAsyncResult Create(WorkflowServiceInstance instance, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) 4516 { 4517 CancelAsyncResult result = new CancelAsyncResult(instance, transaction, callback, state); 4518 result.Run(timeout); 4519 return result; 4520 } 4521 End(IAsyncResult result)4522 public static void End(IAsyncResult result) 4523 { 4524 AsyncResult.End<CancelAsyncResult>(result); 4525 } 4526 ValidateState()4527 protected override bool ValidateState() 4528 { 4529 return this.Instance.ValidateStateForCancel(this.OperationTransaction); 4530 } 4531 PerformOperation()4532 protected override void PerformOperation() 4533 { 4534 this.Instance.Controller.ScheduleCancel(); 4535 4536 // Reset suspended reason for Cancel and Terminate 4537 if (this.Instance.persistenceContext.IsSuspended) 4538 { 4539 this.Instance.persistenceContext.IsSuspended = false; 4540 this.Instance.persistenceContext.SuspendedReason = null; 4541 } 4542 4543 // Cancel implies a state change to runnable. 4544 if (!this.Instance.isInTransaction) 4545 { 4546 this.Instance.isRunnable = true; 4547 this.Instance.state = State.Active; 4548 } 4549 // For transacted, the unload will happen at Tx committed time. 4550 else 4551 { 4552 this.Instance.isTransactedCancelled = true; 4553 } 4554 } 4555 PostOperation()4556 protected override void PostOperation() 4557 { 4558 this.Instance.CompletePendingOperations(); 4559 } 4560 } 4561 4562 class RunAsyncResult : SimpleOperationAsyncResult 4563 { 4564 string operationName; 4565 RunAsyncResult(WorkflowServiceInstance instance, Transaction transaction, string operationName, AsyncCallback callback, object state)4566 RunAsyncResult(WorkflowServiceInstance instance, Transaction transaction, string operationName, AsyncCallback callback, object state) 4567 : base(instance, transaction, callback, state) 4568 { 4569 this.operationName = operationName; 4570 } 4571 Create(WorkflowServiceInstance instance, Transaction transaction, string operationName, TimeSpan timeout, AsyncCallback callback, object state)4572 public static RunAsyncResult Create(WorkflowServiceInstance instance, Transaction transaction, string operationName, TimeSpan timeout, AsyncCallback callback, object state) 4573 { 4574 RunAsyncResult result = new RunAsyncResult(instance, transaction, operationName, callback, state); 4575 result.Run(timeout); 4576 return result; 4577 } 4578 End(IAsyncResult result)4579 public static void End(IAsyncResult result) 4580 { 4581 AsyncResult.End<RunAsyncResult>(result); 4582 } 4583 ValidateState()4584 protected override bool ValidateState() 4585 { 4586 return this.Instance.ValidateStateForRun(this.OperationTransaction, this.operationName); 4587 } 4588 PerformOperation()4589 protected override void PerformOperation() 4590 { 4591 if (!this.Instance.isInTransaction) 4592 { 4593 this.Instance.RunCore(); 4594 } 4595 } 4596 PostOperation()4597 protected override void PostOperation() 4598 { 4599 } 4600 } 4601 4602 class SuspendAsyncResult : SimpleOperationAsyncResult 4603 { 4604 bool isUnlocked; 4605 string reason; 4606 SuspendAsyncResult(WorkflowServiceInstance instance, bool isUnlocked, string reason, Transaction transaction, AsyncCallback callback, object state)4607 SuspendAsyncResult(WorkflowServiceInstance instance, bool isUnlocked, string reason, Transaction transaction, AsyncCallback callback, object state) 4608 : base(instance, transaction, callback, state) 4609 { 4610 this.isUnlocked = isUnlocked; 4611 this.reason = reason; 4612 } 4613 Create(WorkflowServiceInstance instance, bool isUnlocked, string reason, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state)4614 public static SuspendAsyncResult Create(WorkflowServiceInstance instance, bool isUnlocked, string reason, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) 4615 { 4616 SuspendAsyncResult result = new SuspendAsyncResult(instance, isUnlocked, reason, transaction, callback, state); 4617 result.Run(timeout); 4618 return result; 4619 } 4620 End(IAsyncResult result)4621 public static void End(IAsyncResult result) 4622 { 4623 AsyncResult.End<SuspendAsyncResult>(result); 4624 } 4625 4626 protected override bool IsSynchronousOperation 4627 { 4628 get 4629 { 4630 return false; 4631 } 4632 } 4633 ValidateState()4634 protected override bool ValidateState() 4635 { 4636 return this.Instance.ValidateStateForSuspend(this.OperationTransaction); 4637 } 4638 PerformOperation()4639 protected override void PerformOperation() 4640 { 4641 throw Fx.AssertAndThrow("Should not reach here!"); 4642 } 4643 BeginPerformOperation(AsyncCallback callback, object state)4644 protected override IAsyncResult BeginPerformOperation(AsyncCallback callback, object state) 4645 { 4646 return new SuspendCoreAsyncResult(this, callback, state); 4647 } 4648 EndPerformOperation(IAsyncResult result)4649 protected override void EndPerformOperation(IAsyncResult result) 4650 { 4651 SuspendCoreAsyncResult.End(result); 4652 } 4653 PostOperation()4654 protected override void PostOperation() 4655 { 4656 this.Instance.CompletePendingOperations(); 4657 } 4658 4659 class SuspendCoreAsyncResult : AsyncResult 4660 { 4661 static AsyncCompletion handleEndWaitForCanPersist = new AsyncCompletion(HandleEndWaitForCanPersist); 4662 4663 SuspendAsyncResult parent; 4664 SuspendCoreAsyncResult(SuspendAsyncResult parent, AsyncCallback callback, object state)4665 public SuspendCoreAsyncResult(SuspendAsyncResult parent, AsyncCallback callback, object state) 4666 : base(callback, state) 4667 { 4668 this.parent = parent; 4669 4670 IAsyncResult result = this.parent.Instance.BeginWaitForCanPersist(ref this.parent.ownsLock, this.parent.timeoutHelper.RemainingTime(), 4671 PrepareAsyncCompletion(handleEndWaitForCanPersist), this); 4672 if (SyncContinue(result)) 4673 { 4674 this.Complete(true); 4675 } 4676 } 4677 End(IAsyncResult result)4678 public static void End(IAsyncResult result) 4679 { 4680 AsyncResult.End<SuspendCoreAsyncResult>(result); 4681 } 4682 HandleEndWaitForCanPersist(IAsyncResult result)4683 static bool HandleEndWaitForCanPersist(IAsyncResult result) 4684 { 4685 SuspendCoreAsyncResult thisPtr = (SuspendCoreAsyncResult)result.AsyncState; 4686 thisPtr.parent.Instance.EndWaitForCanPersist(result, ref thisPtr.parent.ownsLock); 4687 4688 thisPtr.parent.Instance.persistenceContext.IsSuspended = true; 4689 thisPtr.parent.Instance.persistenceContext.SuspendedReason = thisPtr.parent.reason; 4690 thisPtr.parent.Instance.state = State.Suspended; 4691 4692 if (thisPtr.parent.Instance.Controller.TrackingEnabled) 4693 { 4694 thisPtr.parent.Instance.Controller.Track(new WorkflowInstanceSuspendedRecord(thisPtr.parent.Instance.Id, thisPtr.parent.Instance.WorkflowDefinition.DisplayName, thisPtr.parent.reason, thisPtr.parent.Instance.DefinitionIdentity)); 4695 } 4696 4697 thisPtr.parent.instance.serviceHost.WorkflowServiceHostPerformanceCounters.WorkflowSuspended(); 4698 4699 // This is to handle a corner case where Pause is called 4700 // from an event handler: 4701 // Case 1: Called while executing - pauses the scheduler 4702 // in order to obtain the lock and ReleaseLock never 4703 // calls resume. 4704 // Case 2: Called while not executing - no need to pause 4705 // the scheduler because ReleaseLock makes sure never 4706 // to tell it to post. 4707 // Case 3: Called from UnhandledException handler - the 4708 // scheduler is unpaused and ReleaseLock doesn't 4709 // control the fate of this thread. Instead, this 4710 // thread will return to the scheduler unless we 4711 // tell it to Pause here. 4712 thisPtr.parent.Instance.Controller.RequestPause(); 4713 4714 return true; 4715 } 4716 } 4717 } 4718 4719 class UnsuspendAsyncResult : SimpleOperationAsyncResult 4720 { UnsuspendAsyncResult(WorkflowServiceInstance instance, Transaction transaction, AsyncCallback callback, object state)4721 UnsuspendAsyncResult(WorkflowServiceInstance instance, Transaction transaction, AsyncCallback callback, object state) 4722 : base(instance, transaction, callback, state) 4723 { 4724 } 4725 Create(WorkflowServiceInstance instance, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state)4726 public static UnsuspendAsyncResult Create(WorkflowServiceInstance instance, Transaction transaction, TimeSpan timeout, AsyncCallback callback, object state) 4727 { 4728 UnsuspendAsyncResult result = new UnsuspendAsyncResult(instance, transaction, callback, state); 4729 result.Run(timeout); 4730 return result; 4731 } 4732 End(IAsyncResult result)4733 public static void End(IAsyncResult result) 4734 { 4735 AsyncResult.End<UnsuspendAsyncResult>(result); 4736 } 4737 ValidateState()4738 protected override bool ValidateState() 4739 { 4740 return this.Instance.ValidateStateForUnsuspend(this.OperationTransaction); 4741 } 4742 PerformOperation()4743 protected override void PerformOperation() 4744 { 4745 if (!this.Instance.isInTransaction) 4746 { 4747 this.Instance.isRunnable = true; 4748 } 4749 this.Instance.persistenceContext.IsSuspended = false; 4750 this.Instance.persistenceContext.SuspendedReason = null; 4751 this.Instance.state = State.Active; 4752 4753 if (this.Instance.Controller.TrackingEnabled) 4754 { 4755 this.Instance.Controller.Track(new WorkflowInstanceRecord(this.Instance.Id, this.Instance.WorkflowDefinition.DisplayName, WorkflowInstanceStates.Unsuspended, this.Instance.DefinitionIdentity)); 4756 } 4757 } 4758 PostOperation()4759 protected override void PostOperation() 4760 { 4761 } 4762 } 4763 4764 class AcquireLockOnIdleAsyncResult : AsyncResult 4765 { 4766 static FastAsyncCallback lockAcquiredCallback = new FastAsyncCallback(OnLockAcquired); 4767 static Action<object, TimeoutException> idleReceivedCallback = new Action<object, TimeoutException>(OnIdleReceived); 4768 4769 AsyncWaitHandle idleEvent; 4770 WorkflowServiceInstance instance; 4771 TimeoutHelper timeoutHelper; 4772 bool acquiredLockAsynchronously; 4773 AcquireLockOnIdleAsyncResult(WorkflowServiceInstance instance, TimeSpan timeout, ref bool ownsLock, AsyncCallback callback, object state)4774 public AcquireLockOnIdleAsyncResult(WorkflowServiceInstance instance, TimeSpan timeout, ref bool ownsLock, AsyncCallback callback, object state) 4775 : base(callback, state) 4776 { 4777 Fx.Assert(!ownsLock, "We should never call acquire if we already think we own the lock."); 4778 4779 // We cannot just hand off the lock if we are in a handler thread 4780 // because this might eventually go async (during the operation) 4781 // and we could have multiple operations occurring concurrently. 4782 4783 this.instance = instance; 4784 this.timeoutHelper = new TimeoutHelper(timeout); 4785 4786 bool incrementedActiveOperations = false; 4787 bool decrementActiveOperations = true; 4788 bool completeSelf = true; 4789 object lockToken = null; 4790 4791 try 4792 { 4793 lock (this.instance.activeOperationsLock) 4794 { 4795 try 4796 { 4797 } 4798 finally 4799 { 4800 this.instance.activeOperations++; 4801 incrementedActiveOperations = true; 4802 } 4803 4804 this.instance.executorLock.SetupWaiter(ref lockToken); 4805 } 4806 4807 completeSelf = this.instance.executorLock.EnterAsync(this.timeoutHelper.RemainingTime(), ref lockToken, ref ownsLock, lockAcquiredCallback, this); 4808 4809 // We don't want to decrement the count if we went async 4810 // because the async callback will do the decrement 4811 decrementActiveOperations = completeSelf; 4812 } 4813 finally 4814 { 4815 if (incrementedActiveOperations && decrementActiveOperations) 4816 { 4817 lock (this.instance.activeOperationsLock) 4818 { 4819 this.instance.activeOperations--; 4820 } 4821 } 4822 4823 this.instance.executorLock.CleanupWaiter(lockToken, ref ownsLock); 4824 } 4825 4826 if (completeSelf) 4827 { 4828 if (CheckState(ref ownsLock)) 4829 { 4830 Complete(true); 4831 } 4832 } 4833 } 4834 End(IAsyncResult result)4835 public static void End(IAsyncResult result) 4836 { 4837 AsyncResult.End<AcquireLockOnIdleAsyncResult>(result); 4838 } 4839 End(IAsyncResult result, ref bool ownsLock)4840 public static void End(IAsyncResult result, ref bool ownsLock) 4841 { 4842 // We don't care about validating type because worst 4843 // case scenario we skip this section and validation 4844 // occurs in the base AsyncResult call. 4845 AcquireLockOnIdleAsyncResult thisPtr = result as AcquireLockOnIdleAsyncResult; 4846 4847 if (thisPtr != null) 4848 { 4849 ownsLock = thisPtr.acquiredLockAsynchronously; 4850 } 4851 4852 AsyncResult.End<AcquireLockOnIdleAsyncResult>(result); 4853 } 4854 OnLockAcquired(object state, Exception asyncException)4855 static void OnLockAcquired(object state, Exception asyncException) 4856 { 4857 AcquireLockOnIdleAsyncResult thisPtr = (AcquireLockOnIdleAsyncResult)state; 4858 4859 lock (thisPtr.instance.activeOperationsLock) 4860 { 4861 thisPtr.instance.activeOperations--; 4862 } 4863 4864 if (asyncException != null) 4865 { 4866 thisPtr.Complete(false, asyncException); 4867 return; 4868 } 4869 4870 bool completeSelf = true; 4871 Exception completionException = null; 4872 4873 try 4874 { 4875 thisPtr.acquiredLockAsynchronously = true; 4876 completeSelf = thisPtr.CheckState(ref thisPtr.acquiredLockAsynchronously); 4877 } 4878 catch (Exception e) 4879 { 4880 if (Fx.IsFatal(e)) 4881 { 4882 throw; 4883 } 4884 4885 completionException = e; 4886 } 4887 4888 if (completeSelf) 4889 { 4890 thisPtr.Complete(false, completionException); 4891 } 4892 } 4893 CheckState(ref bool ownsLock)4894 bool CheckState(ref bool ownsLock) 4895 { 4896 if (this.instance.state == State.Active && !this.instance.isRunnable) 4897 { 4898 this.instance.RunCore(); 4899 } 4900 4901 // If instance state is non-Active, the AcquireOnIdle will succeed (WSI is doing nothing), 4902 // the caller is responsible for dealing with state vs. operation. 4903 // For instance, ResumeBookmark will call ValidateStateForResumeProtocolBookmark. 4904 if (this.instance.state == State.Active && this.instance.Controller.State == WorkflowInstanceState.Runnable) 4905 { 4906 this.idleEvent = this.instance.SetupIdleWaiter(ref ownsLock); 4907 4908 try 4909 { 4910 if (this.idleEvent.WaitAsync(idleReceivedCallback, this, this.timeoutHelper.RemainingTime())) 4911 { 4912 ownsLock = true; 4913 } 4914 else 4915 { 4916 return false; 4917 } 4918 } 4919 catch (Exception e) 4920 { 4921 if (Fx.IsFatal(e)) 4922 { 4923 throw; 4924 } 4925 4926 if (this.instance.CleanupIdleWaiter(this.idleEvent, e, ref ownsLock)) 4927 { 4928 throw; 4929 } 4930 } 4931 } 4932 4933 return true; 4934 } 4935 OnIdleReceived(object state, TimeoutException asyncException)4936 static void OnIdleReceived(object state, TimeoutException asyncException) 4937 { 4938 AcquireLockOnIdleAsyncResult thisPtr = (AcquireLockOnIdleAsyncResult)state; 4939 4940 if (asyncException != null) 4941 { 4942 if (thisPtr.instance.CleanupIdleWaiter(thisPtr.idleEvent, asyncException, ref thisPtr.acquiredLockAsynchronously)) 4943 { 4944 Fx.Assert(!thisPtr.acquiredLockAsynchronously, "We shouldn't own the lock if we're rethrowing"); 4945 thisPtr.Complete(false, asyncException); 4946 return; 4947 } 4948 4949 Fx.Assert(thisPtr.acquiredLockAsynchronously, "We should own the lock if we're ----ing"); 4950 } 4951 4952 thisPtr.acquiredLockAsynchronously = true; 4953 4954 thisPtr.Complete(false, null); 4955 } 4956 } 4957 4958 class WaitForCanPersistAsyncResult : AsyncResult 4959 { 4960 static Action<object, TimeoutException> onWaitEvent; 4961 static FastAsyncCallback onLockAcquired; 4962 4963 WorkflowServiceInstance instance; 4964 TimeoutHelper timeoutHelper; 4965 bool ownsLock; 4966 bool mustWait; 4967 AsyncWaitHandle checkCanPersistEvent; 4968 WaitForCanPersistAsyncResult(WorkflowServiceInstance instance, ref bool ownsLock, TimeSpan timeout, AsyncCallback callback, object state)4969 public WaitForCanPersistAsyncResult(WorkflowServiceInstance instance, ref bool ownsLock, TimeSpan timeout, AsyncCallback callback, object state) 4970 : base(callback, state) 4971 { 4972 this.instance = instance; 4973 this.ownsLock = ownsLock; 4974 this.timeoutHelper = new TimeoutHelper(timeout); 4975 4976 Fx.Assert(ownsLock, "Must be called under locked!"); 4977 4978 if (WaitForCanPersist()) 4979 { 4980 Complete(true); 4981 } 4982 } 4983 End(IAsyncResult result, ref bool ownsLock)4984 public static void End(IAsyncResult result, ref bool ownsLock) 4985 { 4986 // We don't care about validating type because worst 4987 // case scenario we skip this section and validation 4988 // occurs in the base AsyncResult call. 4989 WaitForCanPersistAsyncResult thisPtr = result as WaitForCanPersistAsyncResult; 4990 4991 if (thisPtr != null) 4992 { 4993 ownsLock = thisPtr.ownsLock; 4994 } 4995 4996 AsyncResult.End<WaitForCanPersistAsyncResult>(result); 4997 } 4998 WaitForCanPersist()4999 bool WaitForCanPersist() 5000 { 5001 if (this.instance.Controller.IsPersistable) 5002 { 5003 return true; 5004 } 5005 5006 this.instance.Controller.PauseWhenPersistable(); 5007 5008 this.mustWait = false; 5009 if (this.instance.IsIdle) 5010 { 5011 if (this.checkCanPersistEvent == null) 5012 { 5013 this.checkCanPersistEvent = new AsyncWaitHandle(EventResetMode.AutoReset); 5014 } 5015 5016 // Will be signaled when WF is paused. 5017 this.instance.AddCheckCanPersistWaiter(this); 5018 this.mustWait = true; 5019 } 5020 5021 this.instance.ReleaseLock(ref this.ownsLock); 5022 5023 if (this.mustWait) 5024 { 5025 if (onWaitEvent == null) 5026 { 5027 onWaitEvent = new Action<object, TimeoutException>(OnWaitEvent); 5028 } 5029 5030 if (this.checkCanPersistEvent.WaitAsync(onWaitEvent, this, this.timeoutHelper.RemainingTime())) 5031 { 5032 return HandleWaitEvent(); 5033 } 5034 else 5035 { 5036 return false; 5037 } 5038 } 5039 else 5040 { 5041 return HandleWaitEvent(); 5042 } 5043 } 5044 OnWaitEvent(object state, TimeoutException asyncException)5045 static void OnWaitEvent(object state, TimeoutException asyncException) 5046 { 5047 WaitForCanPersistAsyncResult thisPtr = (WaitForCanPersistAsyncResult)state; 5048 5049 if (asyncException != null) 5050 { 5051 thisPtr.Complete(false, asyncException); 5052 return; 5053 } 5054 5055 bool completeSelf = true; 5056 Exception completionException = null; 5057 5058 try 5059 { 5060 completeSelf = thisPtr.HandleWaitEvent(); 5061 } 5062 catch (Exception exception) 5063 { 5064 if (Fx.IsFatal(exception)) 5065 { 5066 throw; 5067 } 5068 5069 completionException = exception; 5070 } 5071 5072 if (completeSelf) 5073 { 5074 thisPtr.Complete(false, completionException); 5075 } 5076 } 5077 SetEvent(ref bool ownsLock)5078 public void SetEvent(ref bool ownsLock) 5079 { 5080 this.ownsLock = ownsLock; 5081 ownsLock = false; 5082 this.checkCanPersistEvent.Set(); 5083 } 5084 HandleWaitEvent()5085 bool HandleWaitEvent() 5086 { 5087 return AcquireLockWithoutPause(); 5088 } 5089 AcquireLockWithoutPause()5090 bool AcquireLockWithoutPause() 5091 { 5092 if (!this.instance.IsHandlerThread && !this.ownsLock) 5093 { 5094 if (onLockAcquired == null) 5095 { 5096 onLockAcquired = new FastAsyncCallback(OnLockAcquired); 5097 } 5098 5099 if (this.instance.AcquireLockAsync(this.timeoutHelper.RemainingTime(), false, true, ref this.ownsLock, onLockAcquired, this)) 5100 { 5101 return HandleLockAcquired(); 5102 } 5103 else 5104 { 5105 return false; 5106 } 5107 } 5108 else 5109 { 5110 return HandleLockAcquired(); 5111 } 5112 } 5113 OnLockAcquired(object state, Exception asyncException)5114 static void OnLockAcquired(object state, Exception asyncException) 5115 { 5116 WaitForCanPersistAsyncResult thisPtr = (WaitForCanPersistAsyncResult)state; 5117 5118 if (asyncException != null) 5119 { 5120 thisPtr.Complete(false, asyncException); 5121 return; 5122 } 5123 5124 thisPtr.ownsLock = true; 5125 5126 bool completeSelf = true; 5127 Exception completionException = null; 5128 5129 try 5130 { 5131 completeSelf = thisPtr.HandleLockAcquired(); 5132 } 5133 catch (Exception exception) 5134 { 5135 if (Fx.IsFatal(exception)) 5136 { 5137 throw; 5138 } 5139 5140 completionException = exception; 5141 } 5142 5143 if (completeSelf) 5144 { 5145 thisPtr.Complete(false, completionException); 5146 } 5147 } 5148 HandleLockAcquired()5149 bool HandleLockAcquired() 5150 { 5151 this.instance.ValidateStateForPersist(); 5152 return WaitForCanPersist(); 5153 } 5154 } 5155 5156 [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.PrivatePrimitive, SupportsAsync = true, ReleaseMethod = "Exit")] 5157 class WorkflowExecutionLock 5158 { 5159 static Action<object, TimeoutException> asyncWaiterSignaledCallback = new Action<object, TimeoutException>(OnAsyncWaiterSignaled); 5160 5161 bool owned; 5162 WorkflowServiceInstance instance; 5163 5164 [Fx.Tag.SynchronizationObject(Blocking = false)] 5165 object ThisLock = new object(); 5166 5167 [Fx.Tag.SynchronizationObject] 5168 List<object> waiters; 5169 5170 #if DEBUG 5171 StackTrace exitStack; 5172 #endif 5173 WorkflowExecutionLock(WorkflowServiceInstance instance)5174 public WorkflowExecutionLock(WorkflowServiceInstance instance) 5175 { 5176 this.instance = instance; 5177 } 5178 5179 public bool IsLocked 5180 { 5181 get { return this.owned; } 5182 } 5183 5184 List<object> Waiters 5185 { 5186 get 5187 { 5188 if (waiters == null) 5189 { 5190 waiters = new List<object>(); 5191 } 5192 5193 return waiters; 5194 } 5195 } 5196 SetupWaiter(ref object token)5197 public void SetupWaiter(ref object token) 5198 { 5199 SetupWaiter(false, ref token); 5200 } 5201 5202 // The token returned here must be fed to all Enter calls 5203 // and finally to CleanupWaiter by the thread that calls 5204 // SetupWaiter. If the enter goes async (such as EnterAsync 5205 // might) then the caller should NOT call cleanup in the async 5206 // callback. SetupWaiter(bool isAbortPriority, ref object token)5207 public void SetupWaiter(bool isAbortPriority, ref object token) 5208 { 5209 lock (ThisLock) 5210 { 5211 try 5212 { 5213 } 5214 finally 5215 { 5216 token = new AsyncWaitHandle(); 5217 5218 if (isAbortPriority) 5219 { 5220 this.Waiters.Insert(0, token); 5221 } 5222 else 5223 { 5224 this.Waiters.Add(token); 5225 } 5226 } 5227 } 5228 } 5229 CleanupWaiter(object token, ref bool ownsLock)5230 public void CleanupWaiter(object token, ref bool ownsLock) 5231 { 5232 if (token != null) 5233 { 5234 lock (ThisLock) 5235 { 5236 if (!this.waiters.Remove(token)) 5237 { 5238 // If it is not in the list that means we've been 5239 // signaled and now own the lock. 5240 5241 ownsLock = true; 5242 } 5243 } 5244 } 5245 } 5246 Enter(TimeSpan timeout, ref object token, ref bool ownsLock)5247 public void Enter(TimeSpan timeout, ref object token, ref bool ownsLock) 5248 { 5249 Fx.Assert(!ownsLock, "We should never attempt to get the lock if we think we own it."); 5250 5251 if (!TryEnter(timeout, ref token, ref ownsLock)) 5252 { 5253 throw FxTrace.Exception.AsError(new TimeoutException(SR.TimeoutOnOperation(timeout))); 5254 } 5255 } 5256 EnterAsync(TimeSpan timeout, ref object token, ref bool ownsLock, FastAsyncCallback callback, object state)5257 public bool EnterAsync(TimeSpan timeout, ref object token, ref bool ownsLock, FastAsyncCallback callback, object state) 5258 { 5259 Fx.Assert(!ownsLock, "We should never attempt to get the lock if we think we own it."); 5260 Fx.Assert(callback != null, "must have a non-null call back for async purposes"); 5261 Fx.Assert(token is AsyncWaitHandle, "The token must be an AsyncWaitHandle."); 5262 5263 AsyncWaitHandle waitHandle = null; 5264 5265 lock (ThisLock) 5266 { 5267 if (!this.owned) 5268 { 5269 try 5270 { 5271 } 5272 finally 5273 { 5274 this.owned = true; 5275 ownsLock = true; 5276 } 5277 5278 return true; 5279 } 5280 5281 waitHandle = (AsyncWaitHandle)token; 5282 } 5283 5284 bool result = false; 5285 5286 if (waitHandle.WaitAsync(asyncWaiterSignaledCallback, new AsyncWaiterData(this, callback, state, waitHandle), timeout)) 5287 { 5288 Fx.Assert(!this.Waiters.Contains(waitHandle), "We should not have this wait handle in the list."); 5289 5290 // Since the waiter is only signaled when they own the lock we won't have 5291 // to set owned to true if this returns true. owned was never set to false 5292 // by Exit in this case. 5293 5294 ownsLock = true; 5295 result = true; 5296 } 5297 5298 token = null; 5299 return result; 5300 } 5301 OnAsyncWaiterSignaled(object state, TimeoutException asyncException)5302 static void OnAsyncWaiterSignaled(object state, TimeoutException asyncException) 5303 { 5304 AsyncWaiterData asyncWaiter = (AsyncWaiterData)state; 5305 5306 Exception completionException = asyncException; 5307 5308 if (asyncException != null) 5309 { 5310 lock (asyncWaiter.Owner.ThisLock) 5311 { 5312 if (!asyncWaiter.Owner.waiters.Remove(asyncWaiter.Token)) 5313 { 5314 // We raced between timing out and getting signaled. 5315 // We'll take the signal which means we now own the lock 5316 5317 completionException = null; 5318 } 5319 } 5320 } 5321 5322 // Callers of EnterAsync take a null value for the exception to mean 5323 // that they own the lock. Either we were signaled (asyncException was 5324 // null), we got the lock in a ----y way (we nulled the exception when 5325 // we found we weren't in the list), or we don't have the lock (asyncException 5326 // is non-null and we are passing it along). 5327 asyncWaiter.Callback(asyncWaiter.State, completionException); 5328 } 5329 TryEnter(ref bool ownsLock)5330 public bool TryEnter(ref bool ownsLock) 5331 { 5332 Fx.Assert(!ownsLock, "We should never attempt to get the lock if we think we own it."); 5333 5334 lock (ThisLock) 5335 { 5336 if (!this.owned) 5337 { 5338 try 5339 { 5340 } 5341 finally 5342 { 5343 this.owned = true; 5344 ownsLock = true; 5345 } 5346 5347 return true; 5348 } 5349 5350 return false; 5351 } 5352 } 5353 TryEnter(TimeSpan timeout, ref object token, ref bool ownsLock)5354 public bool TryEnter(TimeSpan timeout, ref object token, ref bool ownsLock) 5355 { 5356 Fx.Assert(!ownsLock, "We should never attempt to get the lock if we think we own it."); 5357 5358 AsyncWaitHandle waiter = EnterCore(ref token, ref ownsLock); 5359 5360 if (waiter != null) 5361 { 5362 Fx.Assert(!ownsLock, "We should not have gotten a waiter if EnterCore gave us the lock."); 5363 5364 if (waiter.Wait(timeout)) 5365 { 5366 ownsLock = true; 5367 token = null; 5368 return true; 5369 } 5370 else 5371 { 5372 // The waiter will be cleaned up by the caller 5373 return false; 5374 } 5375 } 5376 else 5377 { 5378 Fx.Assert(ownsLock, "We didn't have a waiter which means we got the lock."); 5379 return true; 5380 } 5381 } 5382 EnterCore(ref object token, ref bool ownsLock)5383 AsyncWaitHandle EnterCore(ref object token, ref bool ownsLock) 5384 { 5385 AsyncWaitHandle waiter = null; 5386 5387 lock (ThisLock) 5388 { 5389 if (this.owned) 5390 { 5391 if (token == null) 5392 { 5393 waiter = new AsyncWaitHandle(); 5394 this.Waiters.Add(waiter); 5395 } 5396 else 5397 { 5398 waiter = (AsyncWaitHandle)token; 5399 } 5400 } 5401 else 5402 { 5403 try 5404 { 5405 } 5406 finally 5407 { 5408 this.owned = true; 5409 ownsLock = true; 5410 } 5411 } 5412 } 5413 5414 return waiter; 5415 } 5416 5417 // Returns false if the lock was not released, returns true if released. Exit(bool keepLockIfNoWaiters, ref bool ownsLock)5418 public bool Exit(bool keepLockIfNoWaiters, ref bool ownsLock) 5419 { 5420 Fx.Assert(ownsLock, "We shouldn't call Exit unless we think we own the lock."); 5421 5422 AsyncWaitHandle waiter = null; 5423 5424 lock (ThisLock) 5425 { 5426 if (!this.owned) 5427 { 5428 string message = InternalSR.InvalidSemaphoreExit; 5429 5430 #if DEBUG 5431 if (!Fx.FastDebug && exitStack != null) 5432 { 5433 string originalStack = exitStack.ToString().Replace("\r\n", "\r\n "); 5434 message = string.Format(CultureInfo.InvariantCulture, 5435 "Object synchronization method was called from an unsynchronized block of code. Previous Exit(): {0}", originalStack); 5436 } 5437 #endif 5438 5439 throw FxTrace.Exception.AsError(new SynchronizationLockException(message)); 5440 } 5441 5442 if (this.waiters == null || this.waiters.Count == 0) 5443 { 5444 if (keepLockIfNoWaiters) 5445 { 5446 return false; 5447 } 5448 else 5449 { 5450 try 5451 { 5452 } 5453 finally 5454 { 5455 this.owned = false; 5456 ownsLock = false; 5457 this.instance.StartUnloadInstancePolicyIfNecessary(); 5458 } 5459 5460 #if DEBUG 5461 if (!Fx.FastDebug) 5462 { 5463 exitStack = new StackTrace(); 5464 } 5465 #endif 5466 5467 return true; 5468 } 5469 } 5470 5471 waiter = (AsyncWaitHandle)this.waiters[0]; 5472 this.waiters.RemoveAt(0); 5473 } 5474 5475 // We're giving up the lock to another thread which now has to 5476 // take care of releasing it 5477 waiter.Set(); 5478 ownsLock = false; 5479 5480 // This counts as a successful exit from the point of view 5481 // of callers of Exit. 5482 return true; 5483 } 5484 5485 class AsyncWaiterData 5486 { AsyncWaiterData(WorkflowExecutionLock owner, FastAsyncCallback callback, object state, object token)5487 public AsyncWaiterData(WorkflowExecutionLock owner, FastAsyncCallback callback, object state, object token) 5488 { 5489 this.Owner = owner; 5490 this.Callback = callback; 5491 this.State = state; 5492 this.Token = token; 5493 } 5494 5495 public WorkflowExecutionLock Owner 5496 { 5497 get; 5498 private set; 5499 } 5500 5501 public FastAsyncCallback Callback 5502 { 5503 get; 5504 private set; 5505 } 5506 5507 public object State 5508 { 5509 get; 5510 private set; 5511 } 5512 5513 public object Token 5514 { 5515 get; 5516 private set; 5517 } 5518 } 5519 } 5520 5521 class UnhandledExceptionAsyncData 5522 { UnhandledExceptionAsyncData(WorkflowServiceInstance instance, Exception exception, Activity exceptionSource)5523 public UnhandledExceptionAsyncData(WorkflowServiceInstance instance, Exception exception, Activity exceptionSource) 5524 { 5525 this.Instance = instance; 5526 this.Exception = exception; 5527 this.ExceptionSource = exceptionSource; 5528 } 5529 5530 [SuppressMessage(FxCop.Category.Performance, FxCop.Rule.AvoidUncalledPrivateCode, 5531 Justification = "Tracking team is considering to provide the exception source as part of the WorkflowInstanceUnhandledException record")] 5532 public Activity ExceptionSource 5533 { 5534 get; 5535 private set; 5536 } 5537 5538 public WorkflowServiceInstance Instance 5539 { 5540 get; 5541 private set; 5542 } 5543 5544 public Exception Exception 5545 { 5546 get; 5547 private set; 5548 } 5549 } 5550 5551 class WorkflowPersistenceContext 5552 { 5553 WorkflowServiceInstance instance; 5554 CommittableTransaction contextOwnedTransaction; 5555 Transaction clonedTransaction; 5556 WorkflowPersistenceContext(WorkflowServiceInstance instance, bool transactionRequired, Transaction transactionToUse, TimeSpan transactionTimeout)5557 public WorkflowPersistenceContext(WorkflowServiceInstance instance, bool transactionRequired, Transaction transactionToUse, TimeSpan transactionTimeout) 5558 { 5559 this.instance = instance; 5560 5561 if (transactionToUse != null) 5562 { 5563 this.clonedTransaction = transactionToUse; 5564 } 5565 else if (transactionRequired) 5566 { 5567 this.contextOwnedTransaction = new CommittableTransaction(transactionTimeout); 5568 // Clone it so that we don't pass a CommittableTransaction to the participants 5569 this.clonedTransaction = this.contextOwnedTransaction.Clone(); 5570 } 5571 } 5572 5573 public Transaction PublicTransaction 5574 { 5575 get 5576 { 5577 return this.clonedTransaction; 5578 } 5579 } 5580 Abort()5581 public void Abort() 5582 { 5583 if (this.contextOwnedTransaction != null) 5584 { 5585 try 5586 { 5587 this.contextOwnedTransaction.Rollback(); 5588 } 5589 catch (Exception e) 5590 { 5591 if (Fx.IsFatal(e)) 5592 { 5593 throw; 5594 } 5595 5596 // ---- these exceptions as we are already on the error path 5597 } 5598 } 5599 } 5600 5601 // Returns true if end needs to be called 5602 // Note: this is side effecting even if it returns false TryBeginComplete(AsyncCallback callback, object state, out IAsyncResult result)5603 public bool TryBeginComplete(AsyncCallback callback, object state, out IAsyncResult result) 5604 { 5605 // In the interest of allocating less objects we don't implement 5606 // the full async pattern here. Instead, we've flattened it to 5607 // do the sync part and then optionally delegate down to the inner 5608 // BeginCommit. 5609 if (this.contextOwnedTransaction != null) 5610 { 5611 result = this.contextOwnedTransaction.BeginCommit(callback, state); 5612 return true; 5613 } 5614 else 5615 { 5616 result = null; 5617 return false; 5618 } 5619 } 5620 EndComplete(IAsyncResult result)5621 public void EndComplete(IAsyncResult result) 5622 { 5623 Fx.Assert(this.contextOwnedTransaction != null, "We must have a contextOwnedTransaction if we are calling End"); 5624 5625 this.contextOwnedTransaction.EndCommit(result); 5626 } 5627 } 5628 5629 class UnloadInstancePolicyHelper 5630 { 5631 static Action<object> onTimerCallback = new Action<object>(OnTimerCallback); 5632 static AsyncCallback onPersistCallback = Fx.ThunkCallback(new AsyncCallback(PersistCallback)); 5633 static AsyncCallback onUnloadCallback = Fx.ThunkCallback(new AsyncCallback(UnloadCallback)); 5634 static AsyncCallback onUnlockAndAbortCallback = Fx.ThunkCallback(new AsyncCallback(UnlockAndAbortCallback)); 5635 5636 WorkflowServiceInstance instance; 5637 TimeSpan timeToPersist; 5638 TimeSpan timeToUnload; 5639 IOThreadTimer persistTimer; 5640 IOThreadTimer unloadTimer; 5641 bool cancelled; 5642 bool persistEnabled; 5643 bool unloadEnabled; 5644 UnloadInstancePolicyHelper(WorkflowServiceInstance instance, TimeSpan timeToPersist, TimeSpan timeToUnload)5645 public UnloadInstancePolicyHelper(WorkflowServiceInstance instance, TimeSpan timeToPersist, TimeSpan timeToUnload) 5646 { 5647 Fx.Assert(instance != null, String.Empty); 5648 5649 this.instance = instance; 5650 this.timeToPersist = timeToPersist; 5651 this.timeToUnload = timeToUnload; 5652 this.persistEnabled = this.instance.persistenceContext.CanPersist && this.timeToPersist < this.timeToUnload; 5653 this.unloadEnabled = this.instance.persistenceContext.CanPersist && this.timeToUnload < TimeSpan.MaxValue; 5654 5655 if (this.persistEnabled) 5656 { 5657 this.persistTimer = new IOThreadTimer(onTimerCallback, new Action(Persist), true); 5658 } 5659 if (this.unloadEnabled) 5660 { 5661 this.unloadTimer = new IOThreadTimer(onTimerCallback, new Action(Unload), true); 5662 } 5663 } 5664 5665 [System.Diagnostics.CodeAnalysis.SuppressMessage("Exceptions", "DoNotCatchGeneralExceptionTypes", MessageId = "System.ServiceModel.Activities.WorkflowServiceInstance+UnloadInstancePolicyHelper.OnTimerCallback(System.Object):System.Void", Justification = "The non-fatal exceptions will be traced")] OnTimerCallback(object state)5666 static void OnTimerCallback(object state) 5667 { 5668 try 5669 { 5670 ((Action)state).Invoke(); 5671 } 5672 catch (Exception ex) 5673 { 5674 if (Fx.IsFatal(ex)) 5675 { 5676 throw; 5677 } 5678 FxTrace.Exception.AsWarning(ex); 5679 } 5680 } 5681 Begin()5682 public void Begin() 5683 { 5684 if (this.cancelled) 5685 { 5686 this.cancelled = false; 5687 if (this.persistEnabled) 5688 { 5689 Fx.Assert(this.persistTimer != null, "persistTimer cannot be null if persist is enabled"); 5690 SetTimer(this.persistTimer, this.timeToPersist); 5691 } 5692 else 5693 { 5694 if (this.instance.persistenceContext.CanPersist) 5695 { 5696 if (this.unloadEnabled) 5697 { 5698 Fx.Assert(this.unloadTimer != null, "unloadTimer cannot be null if unload is enabled"); 5699 SetTimer(this.unloadTimer, this.timeToUnload); 5700 } 5701 } 5702 } 5703 } 5704 } 5705 Cancel()5706 public void Cancel() 5707 { 5708 this.cancelled = true; 5709 if (this.persistTimer != null) 5710 { 5711 this.persistTimer.Cancel(); 5712 } 5713 if (this.unloadTimer != null) 5714 { 5715 this.unloadTimer.Cancel(); 5716 } 5717 } 5718 Persist()5719 void Persist() 5720 { 5721 try 5722 { 5723 IAsyncResult result = this.instance.BeginPersist(true, TimeSpan.MaxValue, onPersistCallback, this); 5724 if (result.CompletedSynchronously) 5725 { 5726 HandleEndPersist(result); 5727 } 5728 } 5729 catch (Exception ex) 5730 { 5731 if (Fx.IsFatal(ex)) 5732 { 5733 throw; 5734 } 5735 this.instance.AbortInstance(ex, false); 5736 } 5737 } 5738 PersistCallback(IAsyncResult result)5739 static void PersistCallback(IAsyncResult result) 5740 { 5741 if (result.CompletedSynchronously) 5742 { 5743 return; 5744 } 5745 5746 UnloadInstancePolicyHelper thisPtr = (UnloadInstancePolicyHelper)result.AsyncState; 5747 try 5748 { 5749 thisPtr.HandleEndPersist(result); 5750 } 5751 catch (Exception ex) 5752 { 5753 if (Fx.IsFatal(ex)) 5754 { 5755 throw; 5756 } 5757 thisPtr.instance.AbortInstance(ex, false); 5758 } 5759 } 5760 HandleEndPersist(IAsyncResult result)5761 void HandleEndPersist(IAsyncResult result) 5762 { 5763 bool persistSucceeded = this.instance.EndPersist(result); 5764 5765 if (!this.cancelled) 5766 { 5767 if (this.instance.persistenceContext.CanPersist) 5768 { 5769 if (this.unloadEnabled) 5770 { 5771 Fx.Assert(this.unloadTimer != null, "unloadTimer cannot be null if unload is enabled"); 5772 5773 if (persistSucceeded) 5774 { 5775 Fx.Assert(this.timeToUnload > this.timeToPersist, String.Empty); 5776 SetTimer(this.unloadTimer, this.timeToUnload - this.timeToPersist); 5777 } 5778 } 5779 } 5780 } 5781 } 5782 SetTimer(IOThreadTimer timer, TimeSpan ts)5783 void SetTimer(IOThreadTimer timer, TimeSpan ts) 5784 { 5785 Fx.Assert(timer != null && ts >= TimeSpan.Zero, String.Empty); 5786 5787 // It is ok to dirty read the state, the consistency will be ensured by persis/unload itself. 5788 if (this.instance.state == State.Suspended) 5789 { 5790 // Unload/Persist immediately when suspended 5791 timer.Set(TimeSpan.Zero); 5792 } 5793 else 5794 { 5795 timer.Set(ts); 5796 } 5797 } 5798 Unload()5799 void Unload() 5800 { 5801 try 5802 { 5803 if (this.persistEnabled) 5804 { 5805 // This is an optimization to avoid expensive redundant persist (already persisted). 5806 // We will simply Unlock and Abort an instance. 5807 IAsyncResult result = BeginUnlockAndAbort(TimeSpan.MaxValue, onUnlockAndAbortCallback, this); 5808 if (result.CompletedSynchronously) 5809 { 5810 EndUnlockAndAbort(result); 5811 } 5812 } 5813 else 5814 { 5815 IAsyncResult result = this.instance.BeginReleaseInstance(true, TimeSpan.MaxValue, onUnloadCallback, this); 5816 if (result.CompletedSynchronously) 5817 { 5818 HandleEndUnload(result); 5819 } 5820 } 5821 } 5822 catch (Exception ex) 5823 { 5824 if (Fx.IsFatal(ex)) 5825 { 5826 throw; 5827 } 5828 this.instance.AbortInstance(ex, false); 5829 } 5830 } 5831 UnloadCallback(IAsyncResult result)5832 static void UnloadCallback(IAsyncResult result) 5833 { 5834 if (result.CompletedSynchronously) 5835 { 5836 return; 5837 } 5838 5839 UnloadInstancePolicyHelper thisPtr = (UnloadInstancePolicyHelper)result.AsyncState; 5840 try 5841 { 5842 thisPtr.HandleEndUnload(result); 5843 } 5844 catch (Exception ex) 5845 { 5846 if (Fx.IsFatal(ex)) 5847 { 5848 throw; 5849 } 5850 // 5851 thisPtr.instance.AbortInstance(ex, false); 5852 } 5853 } 5854 HandleEndUnload(IAsyncResult result)5855 void HandleEndUnload(IAsyncResult result) 5856 { 5857 this.instance.EndReleaseInstance(result); 5858 } 5859 BeginUnlockAndAbort(TimeSpan timeout, AsyncCallback callback, object state)5860 IAsyncResult BeginUnlockAndAbort(TimeSpan timeout, AsyncCallback callback, object state) 5861 { 5862 return new UnlockAndAbortAsyncResult(this.instance, timeout, callback, state); 5863 } 5864 EndUnlockAndAbort(IAsyncResult result)5865 void EndUnlockAndAbort(IAsyncResult result) 5866 { 5867 UnlockAndAbortAsyncResult.End(result); 5868 } 5869 UnlockAndAbortCallback(IAsyncResult result)5870 static void UnlockAndAbortCallback(IAsyncResult result) 5871 { 5872 if (result.CompletedSynchronously) 5873 { 5874 return; 5875 } 5876 5877 UnloadInstancePolicyHelper thisPtr = (UnloadInstancePolicyHelper)result.AsyncState; 5878 try 5879 { 5880 thisPtr.EndUnlockAndAbort(result); 5881 } 5882 catch (Exception ex) 5883 { 5884 if (Fx.IsFatal(ex)) 5885 { 5886 throw; 5887 } 5888 thisPtr.instance.AbortInstance(ex, false); 5889 } 5890 } 5891 5892 // This class provides a safe unlock and abort of the instance without persisting. 5893 // The synchronized mechanism is the same as ReleaseAsyncResult. 5894 class UnlockAndAbortAsyncResult : AsyncResult 5895 { 5896 static Action<AsyncResult, Exception> onCompleting = new Action<AsyncResult, Exception>(Finally); 5897 static FastAsyncCallback acquireCompletedCallback = new FastAsyncCallback(AcquireCompletedCallback); 5898 static AsyncCompletion handleEndAbandon; 5899 5900 WorkflowServiceInstance instance; 5901 TimeoutHelper timeoutHelper; 5902 bool referenceAcquired; 5903 UnlockAndAbortAsyncResult(WorkflowServiceInstance instance, TimeSpan timeout, AsyncCallback callback, object state)5904 public UnlockAndAbortAsyncResult(WorkflowServiceInstance instance, TimeSpan timeout, AsyncCallback callback, object state) 5905 : base(callback, state) 5906 { 5907 this.instance = instance; 5908 this.timeoutHelper = new TimeoutHelper(timeout); 5909 this.OnCompleting = onCompleting; 5910 5911 Exception completionException = null; 5912 bool completeSelf = true; 5913 5914 if (this.instance.acquireReferenceSemaphore.EnterAsync(this.timeoutHelper.RemainingTime(), acquireCompletedCallback, this)) 5915 { 5916 try 5917 { 5918 completeSelf = this.HandleEndAcquireReference(); 5919 } 5920 catch (Exception exception) 5921 { 5922 if (Fx.IsFatal(exception)) 5923 { 5924 throw; 5925 } 5926 completionException = exception; 5927 } 5928 } 5929 else 5930 { 5931 completeSelf = false; 5932 } 5933 5934 if (completeSelf) 5935 { 5936 Complete(true, completionException); 5937 } 5938 } 5939 End(IAsyncResult result)5940 public static void End(IAsyncResult result) 5941 { 5942 AsyncResult.End<UnlockAndAbortAsyncResult>(result); 5943 } 5944 AcquireCompletedCallback(object state, Exception completionException)5945 static void AcquireCompletedCallback(object state, Exception completionException) 5946 { 5947 UnlockAndAbortAsyncResult thisPtr = (UnlockAndAbortAsyncResult)state; 5948 5949 bool completeSelf = true; 5950 if (completionException == null) 5951 { 5952 try 5953 { 5954 completeSelf = thisPtr.HandleEndAcquireReference(); 5955 } 5956 catch (Exception exception) 5957 { 5958 if (Fx.IsFatal(exception)) 5959 { 5960 throw; 5961 } 5962 completionException = exception; 5963 } 5964 } 5965 5966 if (completeSelf) 5967 { 5968 thisPtr.Complete(false, completionException); 5969 } 5970 } 5971 HandleEndAcquireReference()5972 bool HandleEndAcquireReference() 5973 { 5974 this.referenceAcquired = true; 5975 5976 if (this.instance.TryReleaseLastReference()) 5977 { 5978 if (handleEndAbandon == null) 5979 { 5980 handleEndAbandon = new AsyncCompletion(HandleEndAbandon); 5981 } 5982 5983 IAsyncResult result = this.instance.BeginAbandon(new FaultException(OperationExecutionFault.CreateAbortedFault(SR.DefaultAbortReason)), false, 5984 this.timeoutHelper.RemainingTime(), PrepareAsyncCompletion(handleEndAbandon), this); 5985 return SyncContinue(result); 5986 } 5987 else 5988 { 5989 return true; 5990 } 5991 } 5992 HandleEndAbandon(IAsyncResult result)5993 static bool HandleEndAbandon(IAsyncResult result) 5994 { 5995 UnlockAndAbortAsyncResult thisPtr = (UnlockAndAbortAsyncResult)result.AsyncState; 5996 thisPtr.instance.EndAbandon(result); 5997 5998 return thisPtr.ReleaseAcquiredReference(); 5999 } 6000 ReleaseAcquiredReference()6001 bool ReleaseAcquiredReference() 6002 { 6003 this.instance.acquireReferenceSemaphore.Exit(); 6004 this.referenceAcquired = false; 6005 return true; 6006 } 6007 Finally(AsyncResult result, Exception completionException)6008 static void Finally(AsyncResult result, Exception completionException) 6009 { 6010 UnlockAndAbortAsyncResult thisPtr = (UnlockAndAbortAsyncResult)result; 6011 if (thisPtr.referenceAcquired) 6012 { 6013 thisPtr.ReleaseAcquiredReference(); 6014 } 6015 } 6016 } 6017 } 6018 6019 class UnhandledExceptionPolicyHelper 6020 { 6021 static AsyncCallback operationCallback = Fx.ThunkCallback(new AsyncCallback(OperationCallback)); 6022 6023 WorkflowServiceInstance instance; 6024 WorkflowUnhandledExceptionAction action; 6025 UnhandledExceptionPolicyHelper(WorkflowServiceInstance instance, WorkflowUnhandledExceptionAction action)6026 public UnhandledExceptionPolicyHelper(WorkflowServiceInstance instance, WorkflowUnhandledExceptionAction action) 6027 { 6028 Fx.Assert(instance != null, "instance must not be null!"); 6029 Fx.Assert(WorkflowUnhandledExceptionActionHelper.IsDefined(action), action + " is invalid!"); 6030 this.instance = instance; 6031 this.action = action; 6032 } 6033 OnUnhandledException(UnhandledExceptionAsyncData data)6034 public void OnUnhandledException(UnhandledExceptionAsyncData data) 6035 { 6036 Fx.Assert(data != null, "data must not be null!"); 6037 Fx.Assert(data.Exception != null, "data.Exception must not be null!"); 6038 6039 FxTrace.Exception.AsWarning(data.Exception); 6040 6041 try 6042 { 6043 IAsyncResult result; 6044 if (this.action == WorkflowUnhandledExceptionAction.Cancel) 6045 { 6046 result = this.instance.BeginCancel(null, TimeSpan.MaxValue, operationCallback, data); 6047 } 6048 else if (this.action == WorkflowUnhandledExceptionAction.Terminate) 6049 { 6050 result = this.instance.BeginTerminate(data.Exception, null, TimeSpan.MaxValue, operationCallback, data); 6051 } 6052 else if (this.action == WorkflowUnhandledExceptionAction.AbandonAndSuspend) 6053 { 6054 this.instance.isRunnable = false; 6055 // For non-durable WF, simply abandon. 6056 if (this.instance.persistenceContext.CanPersist) 6057 { 6058 result = this.instance.BeginAbandonAndSuspend(data.Exception, TimeSpan.MaxValue, operationCallback, data); 6059 } 6060 else 6061 { 6062 result = this.instance.BeginAbandon(data.Exception, TimeSpan.MaxValue, operationCallback, data); 6063 } 6064 } 6065 else 6066 { 6067 this.instance.isRunnable = false; 6068 result = this.instance.BeginAbandon(data.Exception, TimeSpan.MaxValue, operationCallback, data); 6069 } 6070 6071 if (result.CompletedSynchronously) 6072 { 6073 HandleEndOperation(result); 6074 } 6075 } 6076 catch (Exception ex) 6077 { 6078 if (Fx.IsFatal(ex)) 6079 { 6080 throw; 6081 } 6082 this.instance.AbortInstance(ex, true); 6083 } 6084 } 6085 OperationCallback(IAsyncResult result)6086 static void OperationCallback(IAsyncResult result) 6087 { 6088 if (result.CompletedSynchronously) 6089 { 6090 return; 6091 } 6092 6093 UnhandledExceptionAsyncData data = (UnhandledExceptionAsyncData)result.AsyncState; 6094 UnhandledExceptionPolicyHelper thisPtr = data.Instance.UnhandledExceptionPolicy; 6095 try 6096 { 6097 thisPtr.HandleEndOperation(result); 6098 } 6099 catch (Exception ex) 6100 { 6101 if (Fx.IsFatal(ex)) 6102 { 6103 throw; 6104 } 6105 thisPtr.instance.AbortInstance(ex, false); 6106 } 6107 } 6108 HandleEndOperation(IAsyncResult result)6109 void HandleEndOperation(IAsyncResult result) 6110 { 6111 if (this.action == WorkflowUnhandledExceptionAction.Cancel) 6112 { 6113 this.instance.EndCancel(result); 6114 } 6115 else if (this.action == WorkflowUnhandledExceptionAction.Terminate) 6116 { 6117 this.instance.EndTerminate(result); 6118 } 6119 else if (this.action == WorkflowUnhandledExceptionAction.AbandonAndSuspend) 6120 { 6121 if (this.instance.persistenceContext.CanPersist) 6122 { 6123 this.instance.EndAbandonAndSuspend(result); 6124 } 6125 else 6126 { 6127 this.instance.EndAbandon(result); 6128 } 6129 } 6130 else 6131 { 6132 this.instance.EndAbandon(result); 6133 } 6134 } 6135 } 6136 } 6137 } 6138