1 using System; 2 using System.Text; 3 using System.Reflection; 4 using System.ComponentModel; 5 using System.Collections; 6 using System.Collections.Generic; 7 using System.Collections.Specialized; 8 using System.Diagnostics; 9 using System.Diagnostics.CodeAnalysis; 10 using System.Xml; 11 using System.Xml.XPath; 12 using System.Xml.Schema; 13 using System.IO; 14 using System.Runtime.Serialization.Formatters.Binary; 15 using System.Threading; 16 using System.Timers; 17 using System.Security.Permissions; 18 using System.Security.Cryptography; 19 20 using System.Workflow.Runtime; 21 using System.Workflow.ComponentModel; 22 using System.Workflow.Runtime.Hosting; 23 using System.Workflow.Runtime.Tracking; 24 25 namespace System.Workflow.Runtime 26 { 27 /// <summary> 28 /// Creates TrackingListener instances 29 /// </summary> 30 internal class TrackingListenerFactory 31 { 32 private List<TrackingService> _services = null; 33 private bool _initialized = false; 34 private Dictionary<Guid, WeakReference> _listeners = new Dictionary<Guid, WeakReference>(); 35 private volatile object _listenerLock = new object(); 36 37 private System.Timers.Timer _timer = null; 38 private double _interval = 60000; 39 40 private TrackingProfileManager _profileManager = new TrackingProfileManager(); 41 TrackingListenerFactory()42 internal TrackingListenerFactory() 43 { 44 } 45 46 internal TrackingProfileManager TrackingProfileManager 47 { 48 get { return _profileManager; } 49 } 50 /// <summary> 51 /// Must be called 52 /// </summary> 53 /// <param name="skedExec"></param> Initialize(WorkflowRuntime runtime)54 internal void Initialize(WorkflowRuntime runtime) 55 { 56 lock (this) 57 { 58 _services = runtime.TrackingServices; 59 _profileManager.Initialize(runtime); 60 runtime.WorkflowExecutorInitializing += WorkflowExecutorInitializing; 61 62 _timer = new System.Timers.Timer(); 63 _timer.Interval = _interval; 64 _timer.AutoReset = false; // ensure that only one timer thread at a time 65 _timer.Elapsed += new ElapsedEventHandler(Cleanup); 66 _timer.Start(); 67 68 _initialized = true; 69 } 70 } 71 72 /// <summary> 73 /// Clean up static state created in Initialize 74 /// </summary> Uninitialize(WorkflowRuntime runtime)75 internal void Uninitialize(WorkflowRuntime runtime) 76 { 77 lock (this) 78 { 79 _profileManager.Uninitialize(); 80 runtime.WorkflowExecutorInitializing -= WorkflowExecutorInitializing; 81 _timer.Elapsed -= new ElapsedEventHandler(Cleanup); 82 _timer.Stop(); 83 _services = null; 84 _initialized = false; 85 86 _timer.Dispose(); 87 _timer = null; 88 } 89 } 90 /// <summary> 91 /// Callback for associating tracking listeners to in memory instances. Fires for new and loading instances. 92 /// </summary> 93 /// <param name="sender">WorkflowExecutor</param> 94 /// <param name="e"></param> WorkflowExecutorInitializing(object sender, WorkflowRuntime.WorkflowExecutorInitializingEventArgs e)95 void WorkflowExecutorInitializing(object sender, WorkflowRuntime.WorkflowExecutorInitializingEventArgs e) 96 { 97 if (null == sender) 98 throw new ArgumentNullException("sender"); 99 100 if (null == e) 101 throw new ArgumentNullException("e"); 102 103 if (!typeof(WorkflowExecutor).IsInstanceOfType(sender)) 104 throw new ArgumentException("sender"); 105 106 WorkflowExecutor exec = (WorkflowExecutor)sender; 107 // 108 // Add an event to clean up the WeakRef entry 109 exec.WorkflowExecutionEvent += new EventHandler<WorkflowExecutor.WorkflowExecutionEventArgs>(WorkflowExecutionEvent); 110 TrackingCallingState trackingCallingState = exec.TrackingCallingState; 111 TrackingListenerBroker listenerBroker = (TrackingListenerBroker)exec.RootActivity.GetValue(WorkflowExecutor.TrackingListenerBrokerProperty); 112 if (listenerBroker != null) 113 { 114 listenerBroker.ReplaceServices(exec.WorkflowRuntime.TrackingServiceReplacement); 115 } 116 TrackingListener listener = null; 117 // 118 // Check if we still have a weakref to the listener for this instance 119 WeakReference weakref = null; 120 if (e.Loading) 121 { 122 bool found = false; 123 lock (_listenerLock) 124 { 125 found = _listeners.TryGetValue(exec.InstanceId, out weakref); 126 } 127 if (found) 128 { 129 try 130 { 131 // 132 // Instead of checking IsAlive take a ref to the Target 133 // so that it isn't GC'd underneath us. 134 listener = weakref.Target as TrackingListener; 135 } 136 catch (InvalidOperationException) 137 { 138 // 139 // This seems weird but according to msdn 140 // accessing Target can throw ??? 141 // Ignore because it's the same as a null target. 142 } 143 } 144 // 145 // If listener is null because we didn't find the wr in the cache 146 // or because the Target has been GC'd create a new listener 147 if (null != listener) 148 { 149 listener.Broker = listenerBroker; 150 } 151 else 152 { 153 Debug.Assert(null != listenerBroker, "TrackingListenerBroker should not be null during loading"); 154 listener = GetTrackingListener(exec.WorkflowDefinition, exec, listenerBroker); 155 if (null != listener) 156 { 157 if (null != weakref) 158 weakref.Target = listener; 159 else 160 { 161 lock (_listenerLock) 162 { 163 _listeners.Add(exec.ID, new WeakReference(listener)); 164 } 165 } 166 } 167 } 168 } 169 else 170 { 171 // 172 // New instance is being created 173 listener = GetTrackingListener(exec.WorkflowDefinition, exec); 174 175 if (null != listener) 176 { 177 exec.RootActivity.SetValue(WorkflowExecutor.TrackingListenerBrokerProperty, listener.Broker); 178 lock (_listenerLock) 179 { 180 _listeners.Add(exec.ID, new WeakReference(listener)); 181 } 182 } 183 else 184 exec.RootActivity.SetValue(WorkflowExecutor.TrackingListenerBrokerProperty, new TrackingListenerBroker()); 185 } 186 187 if (null != listener) 188 { 189 exec.WorkflowExecutionEvent += new EventHandler<WorkflowExecutor.WorkflowExecutionEventArgs>(listener.WorkflowExecutionEvent); 190 } 191 192 } 193 WorkflowExecutionEvent(object sender, WorkflowExecutor.WorkflowExecutionEventArgs e)194 void WorkflowExecutionEvent(object sender, WorkflowExecutor.WorkflowExecutionEventArgs e) 195 { 196 switch (e.EventType) 197 { 198 case WorkflowEventInternal.Aborted: 199 case WorkflowEventInternal.Completed: 200 case WorkflowEventInternal.Terminated: 201 // 202 // The instance is done - remove 203 // the WeakRef from our list 204 WorkflowExecutor exec = (WorkflowExecutor)sender; 205 lock (_listenerLock) 206 { 207 _listeners.Remove(exec.ID); 208 } 209 break; 210 default: 211 return; 212 } 213 } 214 Cleanup(object sender, ElapsedEventArgs e)215 void Cleanup(object sender, ElapsedEventArgs e) 216 { 217 List<Guid> _toRemove = new List<Guid>(); 218 if ((null != _listeners) || (_listeners.Count > 0)) 219 { 220 lock (_listenerLock) 221 { 222 foreach (KeyValuePair<Guid, WeakReference> kvp in _listeners) 223 { 224 if (null == kvp.Value.Target) 225 _toRemove.Add(kvp.Key); 226 } 227 if (_toRemove.Count > 0) 228 { 229 foreach (Guid g in _toRemove) 230 _listeners.Remove(g); 231 } 232 } 233 } 234 235 lock (this) 236 { 237 if (_timer != null) 238 { 239 _timer.Start(); 240 } 241 } 242 } 243 244 /// <summary> 245 /// Return a tracking listener for a new instance 246 /// </summary> 247 /// <param name="sked">SequentialWorkflow for which the tracking listener will be associated</param> 248 /// <param name="skedExec">ScheduleExecutor for the schedule instance</param> 249 /// <returns>New TrackingListener instance</returns> GetTrackingListener(Activity sked, WorkflowExecutor skedExec)250 internal TrackingListener GetTrackingListener(Activity sked, WorkflowExecutor skedExec) 251 { 252 if (!_initialized) 253 Initialize(skedExec.WorkflowRuntime); 254 255 return GetListener(sked, skedExec, null); 256 } 257 258 /// <summary> 259 /// Return a tracking listener for an existing instance (normally used during loading) 260 /// </summary> 261 /// <param name="sked">SequentialWorkflow for which the tracking listener will be associated</param> 262 /// <param name="skedExec">ScheduleExecutor for the schedule instance</param> 263 /// <param name="broker">TrackingListenerBroker</param> 264 /// <returns>New TrackingListener instance</returns> GetTrackingListener(Activity sked, WorkflowExecutor skedExec, TrackingListenerBroker broker)265 internal TrackingListener GetTrackingListener(Activity sked, WorkflowExecutor skedExec, TrackingListenerBroker broker) 266 { 267 if (!_initialized) 268 Initialize(skedExec.WorkflowRuntime); 269 270 if (null == broker) 271 { 272 WorkflowTrace.Tracking.TraceEvent(TraceEventType.Error, 0, ExecutionStringManager.NullTrackingBroker); 273 return null; 274 } 275 276 return GetListener(sked, skedExec, broker); 277 } 278 GetListenerFromWRCache(Guid instanceId)279 private TrackingListener GetListenerFromWRCache(Guid instanceId) 280 { 281 WeakReference wr = null; 282 TrackingListener listener = null; 283 lock (_listenerLock) 284 { 285 if (!_listeners.TryGetValue(instanceId, out wr)) 286 throw new InvalidOperationException(string.Format(System.Globalization.CultureInfo.InvariantCulture, ExecutionStringManager.ListenerNotInCache, instanceId)); 287 288 listener = wr.Target as TrackingListener; 289 290 if (null == listener) 291 throw new ObjectDisposedException(string.Format(System.Globalization.CultureInfo.InvariantCulture, ExecutionStringManager.ListenerNotInCacheDisposed, instanceId)); 292 } 293 294 return listener; 295 } 296 ReloadProfiles(WorkflowExecutor exec)297 internal void ReloadProfiles(WorkflowExecutor exec) 298 { 299 // Keep control events from other threads out 300 using (new ServiceEnvironment(exec.RootActivity)) 301 { 302 using (exec.ExecutorLock.Enter()) 303 { 304 // check if this is a valid in-memory instance 305 if (!exec.IsInstanceValid) 306 throw new InvalidOperationException(ExecutionStringManager.WorkflowNotValid); 307 308 // suspend the instance 309 bool localSuspend = exec.Suspend(ExecutionStringManager.TrackingProfileUpdate); 310 try 311 { 312 // 313 // Get new profiles 314 TrackingListener listener = GetListenerFromWRCache(exec.InstanceId); 315 listener.ReloadProfiles(exec, exec.InstanceId); 316 } 317 finally 318 { 319 if (localSuspend) 320 { 321 // @undone: for now this will not return till the instance is done 322 // Once Kumar has fixed 4335, we can enable this. 323 exec.Resume(); 324 } 325 } 326 } 327 } 328 } 329 ReloadProfiles(WorkflowExecutor exec, Guid instanceId, ref TrackingListenerBroker broker, ref List<TrackingChannelWrapper> channels)330 internal void ReloadProfiles(WorkflowExecutor exec, Guid instanceId, ref TrackingListenerBroker broker, ref List<TrackingChannelWrapper> channels) 331 { 332 Type workflowType = exec.WorkflowDefinition.GetType(); 333 // 334 // Ask every tracking service if they want to reload 335 // even if they originally returned null for a profile 336 foreach (TrackingService service in _services) 337 { 338 TrackingProfile profile = null; 339 TrackingChannelWrapper w = null; 340 // 341 // Check if the service wants to reload a profile 342 if (service.TryReloadProfile(workflowType, instanceId, out profile)) 343 { 344 bool found = false; 345 int i; 346 for (i = 0; i < channels.Count; i++) 347 { 348 if (service.GetType() == channels[i].TrackingServiceType) 349 { 350 w = channels[i]; 351 found = true; 352 break; 353 } 354 } 355 // 356 // If we don't have a profile, remove what we had for this service type (if anything) 357 if (null == profile) 358 { 359 if (found) 360 { 361 broker.RemoveService(w.TrackingServiceType); 362 channels.RemoveAt(i); 363 } 364 continue; 365 } 366 // 367 // Parse the new profile - instance only, the cache is not involved 368 RTTrackingProfile rtp = new RTTrackingProfile(profile, exec.WorkflowDefinition, workflowType); 369 rtp.IsPrivate = true; 370 371 if (!found) 372 { 373 // 374 // This is a new profile, create new channel, channelwrapper and broker item 375 List<string> activityCallPath = null; 376 Guid callerInstanceId = Guid.Empty; 377 TrackingCallingState trackingCallingState = exec.TrackingCallingState; 378 Debug.Assert((null != trackingCallingState), "WorkflowState is null"); 379 IList<string> path = null; 380 Guid context = GetContext(exec.RootActivity), callerContext = Guid.Empty, callerParentContext = Guid.Empty; 381 // 382 // Use CallerActivityPathProxy to determine if this is an invoked instance 383 if (trackingCallingState != null) 384 { 385 path = trackingCallingState.CallerActivityPathProxy; 386 if ((null != path) && (path.Count > 0)) 387 { 388 activityCallPath = new List<string>(path); 389 390 Debug.Assert(Guid.Empty != trackingCallingState.CallerWorkflowInstanceId, "Instance has an ActivityCallPath but CallerInstanceId is empty"); 391 callerInstanceId = trackingCallingState.CallerWorkflowInstanceId; 392 393 callerContext = trackingCallingState.CallerContextGuid; 394 callerParentContext = trackingCallingState.CallerParentContextGuid; 395 } 396 } 397 398 TrackingParameters tp = new TrackingParameters(instanceId, workflowType, exec.WorkflowDefinition, activityCallPath, callerInstanceId, context, callerContext, callerParentContext); 399 TrackingChannel channel = service.GetTrackingChannel(tp); 400 401 TrackingChannelWrapper wrapper = new TrackingChannelWrapper(channel, service.GetType(), workflowType, rtp); 402 channels.Add(wrapper); 403 404 Type t = service.GetType(); 405 broker.AddService(t, rtp.Version); 406 broker.MakeProfileInstance(t); 407 } 408 else 409 { 410 // 411 // Don't need to call MakeProfilePrivate on the wrapper 412 // because we've already marked it as private and we already 413 // have a private copy of it. 414 //w.MakeProfilePrivate( exec ); 415 w.SetTrackingProfile(rtp); 416 broker.MakeProfileInstance(w.TrackingServiceType); 417 } 418 } 419 } 420 } 421 GetContext(Activity activity)422 internal Guid GetContext(Activity activity) 423 { 424 return ((ActivityExecutionContextInfo)ContextActivityUtils.ContextActivity(activity).GetValue(Activity.ActivityExecutionContextInfoProperty)).ContextGuid; 425 } 426 GetListener(Activity sked, WorkflowExecutor skedExec, TrackingListenerBroker broker)427 private TrackingListener GetListener(Activity sked, WorkflowExecutor skedExec, TrackingListenerBroker broker) 428 { 429 if ((null == sked) || (null == skedExec)) 430 { 431 WorkflowTrace.Tracking.TraceEvent(TraceEventType.Error, 0, ExecutionStringManager.NullParameters); 432 return null; 433 } 434 435 if ((null == _services) || (_services.Count <= 0)) 436 return null; 437 438 bool load = (null != broker); 439 440 List<TrackingChannelWrapper> channels = GetChannels(sked, skedExec, skedExec.InstanceId, sked.GetType(), ref broker); 441 442 if ((null == channels) || (0 == channels.Count)) 443 return null; 444 445 return new TrackingListener(this, sked, skedExec, channels, broker, load); 446 } 447 GetChannels(Activity schedule, WorkflowExecutor exec, Guid instanceID, Type workflowType, ref TrackingListenerBroker broker)448 private List<TrackingChannelWrapper> GetChannels(Activity schedule, WorkflowExecutor exec, Guid instanceID, Type workflowType, ref TrackingListenerBroker broker) 449 { 450 if (null == _services) 451 return null; 452 453 bool initBroker = false; 454 if (null == broker) 455 { 456 broker = new TrackingListenerBroker(); 457 initBroker = true; 458 } 459 460 List<TrackingChannelWrapper> channels = new List<TrackingChannelWrapper>(); 461 462 List<string> activityCallPath = null; 463 Guid callerInstanceId = Guid.Empty; 464 Guid context = GetContext(exec.RootActivity), callerContext = Guid.Empty, callerParentContext = Guid.Empty; 465 466 Debug.Assert(exec is WorkflowExecutor, "Executor is not WorkflowExecutor"); 467 TrackingCallingState trackingCallingState = exec.TrackingCallingState; 468 TrackingListenerBroker trackingListenerBroker = (TrackingListenerBroker)exec.RootActivity.GetValue(WorkflowExecutor.TrackingListenerBrokerProperty); 469 IList<string> path = trackingCallingState != null ? trackingCallingState.CallerActivityPathProxy : null; 470 // 471 // Use CallerActivityPathProxy to determine if this is an invoked instance 472 if ((null != path) && (path.Count > 0)) 473 { 474 activityCallPath = new List<string>(path); 475 476 Debug.Assert(Guid.Empty != trackingCallingState.CallerWorkflowInstanceId, "Instance has an ActivityCallPath but CallerInstanceId is empty"); 477 callerInstanceId = trackingCallingState.CallerWorkflowInstanceId; 478 479 callerContext = trackingCallingState.CallerContextGuid; 480 callerParentContext = trackingCallingState.CallerParentContextGuid; 481 } 482 483 TrackingParameters parameters = new TrackingParameters(instanceID, workflowType, exec.WorkflowDefinition, activityCallPath, callerInstanceId, context, callerContext, callerParentContext); 484 485 for (int i = 0; i < _services.Count; i++) 486 { 487 TrackingChannel channel = null; 488 Type serviceType = _services[i].GetType(); 489 490 // 491 // See if the service has a profile for this schedule type 492 // If not we don't do any tracking for the service 493 // 494 RTTrackingProfile profile = null; 495 496 // 497 // If we've created the broker get the current version of the profile 498 if (initBroker) 499 { 500 profile = _profileManager.GetProfile(_services[i], schedule); 501 502 if (null == profile) 503 continue; 504 505 broker.AddService(serviceType, profile.Version); 506 } 507 else 508 { 509 // 510 // Only reload the services that are in the broker 511 // If services that weren't originally associated to an instance 512 // wish to join that instance they should call ReloadTrackingProfiles 513 if (!broker.ContainsService(serviceType)) 514 continue; 515 516 if (broker.IsProfileInstance(serviceType)) 517 { 518 profile = _profileManager.GetProfile(_services[i], schedule, instanceID); 519 520 if (null == profile) 521 throw new InvalidOperationException(ExecutionStringManager.MissingProfileForService + serviceType.ToString()); 522 523 profile.IsPrivate = true; 524 } 525 else 526 { 527 Version versionId; 528 if (broker.TryGetProfileVersionId(serviceType, out versionId)) 529 { 530 profile = _profileManager.GetProfile(_services[i], schedule, versionId); 531 532 if (null == profile) 533 throw new InvalidOperationException(ExecutionStringManager.MissingProfileForService + serviceType.ToString() + ExecutionStringManager.MissingProfileForVersion + versionId.ToString()); 534 // 535 // If the profile is marked as private clone the instance we got from the cache 536 // The cloned instance is marked as private during the cloning 537 if (broker.IsProfilePrivate(serviceType)) 538 { 539 profile = profile.Clone(); 540 profile.IsPrivate = true; 541 } 542 } 543 else 544 continue; 545 } 546 } 547 548 // 549 // If profile is not null get a channel 550 channel = _services[i].GetTrackingChannel(parameters); 551 552 if (null == channel) 553 throw new InvalidOperationException(ExecutionStringManager.NullChannel); 554 555 channels.Add(new TrackingChannelWrapper(channel, _services[i].GetType(), workflowType, profile)); 556 } 557 558 return channels; 559 } 560 } 561 562 /// <summary> 563 /// Handles subscribing to status change events and receiving event notifications. 564 /// </summary> 565 internal class TrackingListener 566 { 567 private List<TrackingChannelWrapper> _channels = null; 568 private TrackingListenerBroker _broker = null; 569 private TrackingListenerFactory _factory = null; 570 TrackingListener()571 protected TrackingListener() 572 { 573 } 574 TrackingListener(TrackingListenerFactory factory, Activity sked, WorkflowExecutor exec, List<TrackingChannelWrapper> channels, TrackingListenerBroker broker, bool load)575 internal TrackingListener(TrackingListenerFactory factory, Activity sked, WorkflowExecutor exec, List<TrackingChannelWrapper> channels, TrackingListenerBroker broker, bool load) 576 { 577 if ((null == sked) || (null == broker)) 578 { 579 WorkflowTrace.Tracking.TraceEvent(TraceEventType.Error, 0, ExecutionStringManager.NullParameters); 580 return; 581 } 582 _factory = factory; 583 _channels = channels; 584 // 585 // Keep a reference to the broker so that we can hand it out when adding subscriptions 586 _broker = broker; 587 // 588 // Give the broker our reference so that it can call us back on behalf of subscriptions 589 _broker.TrackingListener = this; 590 } 591 592 internal TrackingListenerBroker Broker 593 { 594 get { return _broker; } 595 set { _broker = value; } 596 } 597 ReloadProfiles(WorkflowExecutor exec, Guid instanceId)598 internal void ReloadProfiles(WorkflowExecutor exec, Guid instanceId) 599 { 600 // 601 // Ask the factory to redo the channels and broker 602 _factory.ReloadProfiles(exec, instanceId, ref _broker, ref _channels); 603 } 604 605 606 #region Event Handlers 607 ActivityStatusChange(object sender, WorkflowExecutor.ActivityStatusChangeEventArgs e)608 internal void ActivityStatusChange(object sender, WorkflowExecutor.ActivityStatusChangeEventArgs e) 609 { 610 WorkflowTrace.Tracking.TraceInformation("TrackingListener::ActivityStatusChange - Received Activity Status Change Event for activity {0}", e.Activity.QualifiedName); 611 612 if (null == sender) 613 throw new ArgumentNullException("sender"); 614 615 if (!typeof(WorkflowExecutor).IsInstanceOfType(sender)) 616 throw new ArgumentException("sender"); 617 618 if (null == e) 619 throw new ArgumentNullException("e"); 620 621 WorkflowExecutor exec = (WorkflowExecutor)sender; 622 623 if ((null == _channels) || (_channels.Count <= 0)) 624 { 625 WorkflowTrace.Tracking.TraceEvent(TraceEventType.Error, 0, ExecutionStringManager.NoChannels); 626 return; 627 } 628 629 Activity activity = e.Activity; 630 631 if (!SubscriptionRequired(activity, exec)) 632 return; 633 // 634 // Get the shared data that is the same for each tracking channel that gets a record 635 Guid parentContextGuid = Guid.Empty, contextGuid = Guid.Empty; 636 GetContext(activity, exec, out contextGuid, out parentContextGuid); 637 638 DateTime dt = DateTime.UtcNow; 639 int eventOrderId = _broker.GetNextEventOrderId(); 640 641 foreach (TrackingChannelWrapper wrapper in _channels) 642 { 643 // 644 // Create a record for each tracking channel 645 // Each channel gets a distinct record because extract data will almost always be different. 646 ActivityTrackingRecord record = new ActivityTrackingRecord(activity.GetType(), activity.QualifiedName, contextGuid, parentContextGuid, activity.ExecutionStatus, dt, eventOrderId, null); 647 648 bool extracted = wrapper.GetTrackingProfile(exec).TryTrackActivityEvent(activity, activity.ExecutionStatus, exec, record); 649 // 650 // Only send the record to the channel if the profile indicates that it is interested 651 // This doesn't mean that the Body will always have data in it, 652 // it may be an empty extraction (just header info) 653 if (extracted) 654 wrapper.TrackingChannel.Send(record); 655 } 656 } 657 UserTrackPoint(object sender, WorkflowExecutor.UserTrackPointEventArgs e)658 internal void UserTrackPoint(object sender, WorkflowExecutor.UserTrackPointEventArgs e) 659 { 660 if (!typeof(WorkflowExecutor).IsInstanceOfType(sender)) 661 throw new ArgumentException("sender is not WorkflowExecutor"); 662 663 WorkflowExecutor exec = (WorkflowExecutor)sender; 664 Activity activity = e.Activity; 665 666 DateTime dt = DateTime.UtcNow; 667 int eventOrderId = _broker.GetNextEventOrderId(); 668 669 Guid parentContextGuid, contextGuid; 670 GetContext(activity, exec, out contextGuid, out parentContextGuid); 671 672 foreach (TrackingChannelWrapper wrapper in _channels) 673 { 674 UserTrackingRecord record = new UserTrackingRecord(activity.GetType(), activity.QualifiedName, contextGuid, parentContextGuid, dt, eventOrderId, e.Key, e.Args); 675 676 if (wrapper.GetTrackingProfile(exec).TryTrackUserEvent(activity, e.Key, e.Args, exec, record)) 677 wrapper.TrackingChannel.Send(record); 678 } 679 } 680 WorkflowExecutionEvent(object sender, WorkflowExecutor.WorkflowExecutionEventArgs e)681 internal void WorkflowExecutionEvent(object sender, WorkflowExecutor.WorkflowExecutionEventArgs e) 682 { 683 if (null == sender) 684 throw new ArgumentNullException("sender"); 685 686 WorkflowExecutor exec = sender as WorkflowExecutor; 687 if (null == exec) 688 throw new ArgumentException(ExecutionStringManager.InvalidSenderWorkflowExecutor); 689 // 690 // Many events are mapped "forward" and sent to tracking services 691 // (Persisting->Persisted, SchedulerEmpty->Idle) 692 // This is so that a batch is always available when a tracking service gets an event. 693 // Without this tracking data could be inconsistent with the state of the instance. 694 switch (e.EventType) 695 { 696 case WorkflowEventInternal.Creating: 697 NotifyChannels(TrackingWorkflowEvent.Created, e, exec); 698 return; 699 case WorkflowEventInternal.Starting: 700 NotifyChannels(TrackingWorkflowEvent.Started, e, exec); 701 return; 702 case WorkflowEventInternal.Suspending: 703 NotifyChannels(TrackingWorkflowEvent.Suspended, e, exec); 704 return; 705 case WorkflowEventInternal.Resuming: 706 NotifyChannels(TrackingWorkflowEvent.Resumed, e, exec); 707 return; 708 case WorkflowEventInternal.Persisting: 709 NotifyChannels(TrackingWorkflowEvent.Persisted, e, exec); 710 return; 711 case WorkflowEventInternal.Unloading: 712 NotifyChannels(TrackingWorkflowEvent.Unloaded, e, exec); 713 return; 714 case WorkflowEventInternal.Loading: 715 NotifyChannels(TrackingWorkflowEvent.Loaded, e, exec); 716 return; 717 case WorkflowEventInternal.Completing: 718 NotifyChannels(TrackingWorkflowEvent.Completed, e, exec); 719 NotifyChannelsOfCompletionOrTermination(); 720 return; 721 case WorkflowEventInternal.Aborting: 722 NotifyChannels(TrackingWorkflowEvent.Aborted, e, exec); 723 return; 724 case WorkflowEventInternal.Terminating: 725 NotifyChannels(TrackingWorkflowEvent.Terminated, e, exec); 726 NotifyChannelsOfCompletionOrTermination(); 727 return; 728 case WorkflowEventInternal.Exception: 729 NotifyChannels(TrackingWorkflowEvent.Exception, e, exec); 730 return; 731 case WorkflowEventInternal.SchedulerEmpty: 732 NotifyChannels(TrackingWorkflowEvent.Idle, e, exec); 733 return; 734 case WorkflowEventInternal.UserTrackPoint: 735 UserTrackPoint(exec, (WorkflowExecutor.UserTrackPointEventArgs)e); 736 return; 737 case WorkflowEventInternal.ActivityStatusChange: 738 ActivityStatusChange(exec, (WorkflowExecutor.ActivityStatusChangeEventArgs)e); 739 return; 740 case WorkflowEventInternal.DynamicChangeBegin: 741 DynamicUpdateBegin(exec, (WorkflowExecutor.DynamicUpdateEventArgs)e); 742 return; 743 case WorkflowEventInternal.DynamicChangeRollback: 744 DynamicUpdateRollback(exec, (WorkflowExecutor.DynamicUpdateEventArgs)e); 745 return; 746 case WorkflowEventInternal.DynamicChangeCommit: 747 DynamicUpdateCommit(exec, (WorkflowExecutor.DynamicUpdateEventArgs)e); 748 return; 749 default: 750 return; 751 } 752 } 753 DynamicUpdateBegin(object sender, WorkflowExecutor.DynamicUpdateEventArgs e)754 internal void DynamicUpdateBegin(object sender, WorkflowExecutor.DynamicUpdateEventArgs e) 755 { 756 if (null == sender) 757 throw new ArgumentNullException("sender"); 758 759 if (!typeof(WorkflowExecutor).IsInstanceOfType(sender)) 760 throw new ArgumentException("sender"); 761 762 WorkflowExecutor exec = (WorkflowExecutor)sender; 763 // 764 // WorkflowChangeEventArgs may be null or the WorkflowChanges may be null or empty 765 // If so there's no work to do here 766 if (null == e.ChangeActions) 767 return; 768 // 769 // Clone the profiles to create instance specific copies (if they aren't already) 770 MakeProfilesPrivate(exec); 771 // 772 // Give the profiles the changes. At this point we are in a volatile state. 773 // Profiles must act as if the changes will succeed but roll back any internal changes if they do not. 774 foreach (TrackingChannelWrapper channel in _channels) 775 { 776 channel.GetTrackingProfile(exec).WorkflowChangeBegin(e.ChangeActions); 777 } 778 } 779 DynamicUpdateRollback(object sender, WorkflowExecutor.DynamicUpdateEventArgs e)780 internal void DynamicUpdateRollback(object sender, WorkflowExecutor.DynamicUpdateEventArgs e) 781 { 782 if (null == sender) 783 throw new ArgumentNullException("sender"); 784 785 if (!typeof(WorkflowExecutor).IsInstanceOfType(sender)) 786 throw new ArgumentException("sender"); 787 788 WorkflowExecutor exec = (WorkflowExecutor)sender; 789 790 foreach (TrackingChannelWrapper channel in _channels) 791 { 792 channel.GetTrackingProfile(exec).WorkflowChangeRollback(); 793 } 794 } 795 DynamicUpdateCommit(object sender, WorkflowExecutor.DynamicUpdateEventArgs e)796 internal void DynamicUpdateCommit(object sender, WorkflowExecutor.DynamicUpdateEventArgs e) 797 { 798 if (null == sender) 799 throw new ArgumentNullException("sender"); 800 801 if (!typeof(WorkflowExecutor).IsInstanceOfType(sender)) 802 throw new ArgumentException("sender"); 803 804 WorkflowExecutor exec = (WorkflowExecutor)sender; 805 806 DateTime dt = DateTime.UtcNow; 807 foreach (TrackingChannelWrapper channel in _channels) 808 { 809 channel.GetTrackingProfile(exec).WorkflowChangeCommit(); 810 } 811 // 812 // Notify tracking channels of changes 813 int eventOrderId = _broker.GetNextEventOrderId(); 814 815 foreach (TrackingChannelWrapper wrapper in _channels) 816 { 817 WorkflowTrackingRecord rec = new WorkflowTrackingRecord(TrackingWorkflowEvent.Changed, dt, eventOrderId, new TrackingWorkflowChangedEventArgs(e.ChangeActions, exec.WorkflowDefinition)); 818 if (wrapper.GetTrackingProfile(exec).TryTrackInstanceEvent(TrackingWorkflowEvent.Changed, rec)) 819 wrapper.TrackingChannel.Send(rec); 820 } 821 } 822 823 #endregion Event Handlers 824 825 #region Private Methods 826 NotifyChannels(TrackingWorkflowEvent evt, WorkflowExecutor.WorkflowExecutionEventArgs e, WorkflowExecutor exec)827 private void NotifyChannels(TrackingWorkflowEvent evt, WorkflowExecutor.WorkflowExecutionEventArgs e, WorkflowExecutor exec) 828 { 829 DateTime dt = DateTime.UtcNow; 830 int eventOrderId = _broker.GetNextEventOrderId(); 831 832 foreach (TrackingChannelWrapper wrapper in _channels) 833 { 834 EventArgs args = null; 835 switch (evt) 836 { 837 case TrackingWorkflowEvent.Suspended: 838 args = new TrackingWorkflowSuspendedEventArgs(((WorkflowExecutor.WorkflowExecutionSuspendingEventArgs)e).Error); 839 break; 840 case TrackingWorkflowEvent.Terminated: 841 WorkflowExecutor.WorkflowExecutionTerminatingEventArgs wtea = (WorkflowExecutor.WorkflowExecutionTerminatingEventArgs)e; 842 if (null != wtea.Exception) 843 args = new TrackingWorkflowTerminatedEventArgs(wtea.Exception); 844 else 845 args = new TrackingWorkflowTerminatedEventArgs(wtea.Error); 846 break; 847 case TrackingWorkflowEvent.Exception: 848 WorkflowExecutor.WorkflowExecutionExceptionEventArgs weea = (WorkflowExecutor.WorkflowExecutionExceptionEventArgs)e; 849 args = new TrackingWorkflowExceptionEventArgs(weea.Exception, weea.CurrentPath, weea.OriginalPath, weea.ContextGuid, weea.ParentContextGuid); 850 break; 851 } 852 WorkflowTrackingRecord rec = new WorkflowTrackingRecord(evt, dt, eventOrderId, args); 853 if (wrapper.GetTrackingProfile(exec).TryTrackInstanceEvent(evt, rec)) 854 wrapper.TrackingChannel.Send(rec); 855 } 856 } 857 NotifyChannelsOfCompletionOrTermination()858 private void NotifyChannelsOfCompletionOrTermination() 859 { 860 foreach (TrackingChannelWrapper wrapper in _channels) 861 wrapper.TrackingChannel.InstanceCompletedOrTerminated(); 862 } 863 GetContext(Activity activity, WorkflowExecutor exec, out Guid contextGuid, out Guid parentContextGuid)864 private void GetContext(Activity activity, WorkflowExecutor exec, out Guid contextGuid, out Guid parentContextGuid) 865 { 866 contextGuid = _factory.GetContext(activity); 867 868 if (null != activity.Parent) 869 parentContextGuid = _factory.GetContext(activity.Parent); 870 else 871 parentContextGuid = contextGuid; 872 873 Debug.Assert(contextGuid != Guid.Empty, "TrackingContext is empty"); 874 Debug.Assert(parentContextGuid != Guid.Empty, "Parent TrackingContext is empty"); 875 } 876 877 /// <summary> 878 /// Clone all profiles to create private versions in order to hold subscriptions for dynamic changes 879 /// </summary> MakeProfilesPrivate(WorkflowExecutor exec)880 private void MakeProfilesPrivate(WorkflowExecutor exec) 881 { 882 foreach (TrackingChannelWrapper channel in _channels) 883 { 884 channel.MakeProfilePrivate(exec); 885 _broker.MakeProfilePrivate(channel.TrackingServiceType); 886 } 887 } 888 /// <summary> 889 /// Determine if subscriptions are needed 890 /// </summary> 891 /// <param name="activity">Activity for which to check subscription needs</param> 892 /// <returns></returns> SubscriptionRequired(Activity activity, WorkflowExecutor exec)893 private bool SubscriptionRequired(Activity activity, WorkflowExecutor exec) 894 { 895 // 896 // Give each channel a chance to prep itself 897 bool needed = false; 898 899 foreach (TrackingChannelWrapper channel in _channels) 900 { 901 if ((channel.GetTrackingProfile(exec).ActivitySubscriptionNeeded(activity)) && (!needed)) 902 needed = true; 903 } 904 905 return needed; 906 } 907 908 #endregion 909 } 910 911 /// <summary> 912 /// This is a lightweight class that is serialized so that the TrackingListener doesn't have to be. 913 /// Every subscription that the listener adds holds a reference to this class. 914 /// When an instance is loaded the broker is given to the listener factory and the listener factory 915 /// gives the broker the new listener. This saves us from having to persist the listener itself which 916 /// means that while we do need to persist a list of service types and their profile version we don't 917 /// have to persist the channels themselves (and we can't control how heavy channels get as they are host defined). 918 /// </summary> 919 [Serializable] 920 internal class TrackingListenerBroker : System.Runtime.Serialization.ISerializable 921 { 922 [NonSerialized] 923 private TrackingListener _listener = null; 924 private int _eventOrderId = 0; 925 private Dictionary<Guid, ServiceProfileContainer> _services = new Dictionary<Guid, ServiceProfileContainer>(); 926 TrackingListenerBroker()927 internal TrackingListenerBroker() 928 { 929 } 930 TrackingListenerBroker(TrackingListener listener)931 internal TrackingListenerBroker(TrackingListener listener) 932 { 933 _listener = listener; 934 } 935 936 internal TrackingListener TrackingListener 937 { 938 // 939 // FxCops minbar complains because this isn't used. 940 // The Setter is required; seems weird not to have a getter. 941 //get { return _listener; } 942 set { _listener = value; } 943 } 944 ContainsService(Type trackingServiceType)945 internal bool ContainsService(Type trackingServiceType) 946 { 947 return _services.ContainsKey(HashHelper.HashServiceType(trackingServiceType)); 948 } 949 AddService(Type trackingServiceType, Version profileVersionId)950 internal void AddService(Type trackingServiceType, Version profileVersionId) 951 { 952 _services.Add(HashHelper.HashServiceType(trackingServiceType), new ServiceProfileContainer(profileVersionId)); 953 } 954 ReplaceServices(Dictionary<string, Type> replacements)955 internal void ReplaceServices(Dictionary<string, Type> replacements) 956 { 957 if (replacements != null && replacements.Count > 0) 958 { 959 ServiceProfileContainer item; 960 foreach (KeyValuePair<string, Type> replacement in replacements) 961 { 962 Guid previous = HashHelper.HashServiceType(replacement.Key); 963 if (_services.TryGetValue(previous, out item)) 964 { 965 _services.Remove(previous); 966 Guid current = HashHelper.HashServiceType(replacement.Value); 967 if (!_services.ContainsKey(current)) 968 { 969 _services.Add(current, item); 970 } 971 } 972 } 973 } 974 } 975 RemoveService(Type trackingServiceType)976 internal void RemoveService(Type trackingServiceType) 977 { 978 _services.Remove(HashHelper.HashServiceType(trackingServiceType)); 979 } 980 TryGetProfileVersionId(Type trackingServiceType, out Version profileVersionId)981 internal bool TryGetProfileVersionId(Type trackingServiceType, out Version profileVersionId) 982 { 983 profileVersionId = new Version(0, 0); 984 985 ServiceProfileContainer service = null; 986 if (_services.TryGetValue(HashHelper.HashServiceType(trackingServiceType), out service)) 987 { 988 profileVersionId = service.ProfileVersionId; 989 return true; 990 } 991 return false; 992 } 993 MakeProfilePrivate(Type trackingServiceType)994 internal void MakeProfilePrivate(Type trackingServiceType) 995 { 996 ServiceProfileContainer service = null; 997 if (!_services.TryGetValue(HashHelper.HashServiceType(trackingServiceType), out service)) 998 throw new ArgumentException(ExecutionStringManager.InvalidTrackingService); 999 1000 service.IsPrivate = true; 1001 } 1002 IsProfilePrivate(Type trackingServiceType)1003 internal bool IsProfilePrivate(Type trackingServiceType) 1004 { 1005 ServiceProfileContainer service = null; 1006 if (!_services.TryGetValue(HashHelper.HashServiceType(trackingServiceType), out service)) 1007 throw new ArgumentException(ExecutionStringManager.InvalidTrackingService); 1008 1009 return service.IsPrivate; 1010 } 1011 MakeProfileInstance(Type trackingServiceType)1012 internal void MakeProfileInstance(Type trackingServiceType) 1013 { 1014 ServiceProfileContainer service = null; 1015 if (!_services.TryGetValue(HashHelper.HashServiceType(trackingServiceType), out service)) 1016 throw new ArgumentException(ExecutionStringManager.InvalidTrackingService); 1017 // 1018 // Can't be instance without being private 1019 service.IsPrivate = true; 1020 service.IsInstance = true; 1021 } 1022 IsProfileInstance(Type trackingServiceType)1023 internal bool IsProfileInstance(Type trackingServiceType) 1024 { 1025 ServiceProfileContainer service = null; 1026 if (!_services.TryGetValue(HashHelper.HashServiceType(trackingServiceType), out service)) 1027 throw new ArgumentException(ExecutionStringManager.InvalidTrackingService); 1028 1029 return service.IsInstance; 1030 } 1031 GetNextEventOrderId()1032 internal int GetNextEventOrderId() 1033 { 1034 checked 1035 { 1036 return ++_eventOrderId; 1037 } 1038 } 1039 1040 [Serializable] 1041 internal class ServiceProfileContainer 1042 { 1043 Version _profileVersionId = new Version(0, 0); 1044 bool _isPrivate = false; 1045 bool _isInstance = false; 1046 ServiceProfileContainer()1047 protected ServiceProfileContainer() { } 1048 ServiceProfileContainer(Version profileVersionId)1049 internal ServiceProfileContainer(Version profileVersionId) 1050 { 1051 _profileVersionId = profileVersionId; 1052 } 1053 1054 internal Version ProfileVersionId 1055 { 1056 get { return _profileVersionId; } 1057 } 1058 1059 internal bool IsPrivate 1060 { 1061 get { return _isPrivate; } 1062 set { _isPrivate = value; } 1063 } 1064 1065 internal bool IsInstance 1066 { 1067 get { return _isInstance; } 1068 set { _isInstance = value; } 1069 } 1070 } 1071 1072 #region ISerializable Members 1073 1074 [SecurityPermissionAttribute(SecurityAction.Demand, SerializationFormatter = true)] GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)1075 public void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) 1076 { 1077 info.AddValue("eventOrderId", this._eventOrderId); 1078 info.AddValue("services", this._services.Count == 0 ? null : this._services); 1079 } TrackingListenerBroker(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)1080 private TrackingListenerBroker(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) 1081 { 1082 this._eventOrderId = info.GetInt32("eventOrderId"); 1083 this._services = (Dictionary<Guid, ServiceProfileContainer>)info.GetValue("services", typeof(Dictionary<Guid, ServiceProfileContainer>)); 1084 if (this._services == null) 1085 this._services = new Dictionary<Guid, ServiceProfileContainer>(); 1086 } 1087 1088 #endregion 1089 } 1090 1091 /// <summary> 1092 /// Manages profile requests, caching profiles and creating RTTrackingProfile instances. 1093 /// </summary> 1094 internal class TrackingProfileManager 1095 { 1096 // 1097 // This is a dictionary keyed by tracking service type 1098 // that returns a dictionary that is key by schedule type 1099 // that returns a Set of profile versions for that schedule type 1100 // The set is constrained by VersionId 1101 private Dictionary<Type, Dictionary<Type, ProfileList>> _cacheLookup; 1102 // 1103 // Protects _cacheLookup 1104 private object _cacheLock = new object(); 1105 // 1106 // Values assigned in Initialize 1107 private bool _init = false; 1108 private List<TrackingService> _services = null; 1109 private WorkflowRuntime _runtime = null; 1110 TrackingProfileManager()1111 internal TrackingProfileManager() 1112 { 1113 } 1114 /// <summary> 1115 /// Clears all entries from the cache by reinitializing the member 1116 /// </summary> ClearCache()1117 public static void ClearCache() 1118 { 1119 WorkflowRuntime.ClearTrackingProfileCache(); 1120 } 1121 ClearCacheImpl()1122 internal void ClearCacheImpl() 1123 { 1124 lock (_cacheLock) 1125 { 1126 _cacheLookup = new Dictionary<Type, Dictionary<Type, ProfileList>>(); 1127 } 1128 } 1129 /// <summary> 1130 /// Create static state 1131 /// </summary> 1132 /// <param name="runtime"></param> Initialize(WorkflowRuntime runtime)1133 internal void Initialize(WorkflowRuntime runtime) 1134 { 1135 lock (_cacheLock) 1136 { 1137 if (null == runtime) 1138 throw new ArgumentException(ExecutionStringManager.NullEngine); 1139 1140 _runtime = runtime; 1141 // 1142 // Initialize the cache 1143 // Do this every time the runtime starts/stops to make life easier 1144 // for IProfileNotification tracking services that might have updated 1145 // profiles while we were stopped - we'll go get new versions since nothing is cached 1146 // without them having to fire updated events. 1147 _cacheLookup = new Dictionary<Type, Dictionary<Type, ProfileList>>(); 1148 if (null != runtime.TrackingServices) 1149 { 1150 _services = runtime.TrackingServices; 1151 foreach (TrackingService service in _services) 1152 { 1153 if (service is IProfileNotification) 1154 { 1155 ((IProfileNotification)service).ProfileUpdated += new EventHandler<ProfileUpdatedEventArgs>(ProfileUpdated); 1156 ((IProfileNotification)service).ProfileRemoved += new EventHandler<ProfileRemovedEventArgs>(ProfileRemoved); 1157 } 1158 } 1159 } 1160 _init = true; 1161 } 1162 } 1163 /// <summary> 1164 /// Clean up static state 1165 /// </summary> Uninitialize()1166 internal void Uninitialize() 1167 { 1168 lock (_cacheLock) 1169 { 1170 if (null != _runtime) 1171 { 1172 foreach (TrackingService service in _services) 1173 { 1174 if (service is IProfileNotification) 1175 { 1176 ((IProfileNotification)service).ProfileUpdated -= new EventHandler<ProfileUpdatedEventArgs>(ProfileUpdated); 1177 ((IProfileNotification)service).ProfileRemoved -= new EventHandler<ProfileRemovedEventArgs>(ProfileRemoved); 1178 } 1179 } 1180 } 1181 _runtime = null; 1182 _services = null; 1183 _init = false; 1184 } 1185 } 1186 /// <summary> 1187 /// Retrieves the current version of a profile from the specified service 1188 /// </summary> GetProfile(TrackingService service, Activity schedule)1189 internal RTTrackingProfile GetProfile(TrackingService service, Activity schedule) 1190 { 1191 if (!_init) 1192 throw new ApplicationException(ExecutionStringManager.TrackingProfileManagerNotInitialized); 1193 1194 if ((null == service) || (null == schedule)) 1195 { 1196 WorkflowTrace.Tracking.TraceEvent(TraceEventType.Error, 0, ExecutionStringManager.NullParameters); 1197 return null; 1198 } 1199 1200 Type workflowType = schedule.GetType(); 1201 RTTrackingProfile tp = null; 1202 if (service is IProfileNotification) 1203 { 1204 // 1205 // If we found the profile in the cache return it, it may be null, this is OK 1206 // (no profile for this service type/schedule type combination) 1207 if (TryGetFromCache(service.GetType(), workflowType, out tp)) 1208 return tp; 1209 } 1210 // 1211 // Either we don't have anything in the cache for this schedule/service combination 1212 // or this is a base TrackingService that doesn't notify of profile updates 1213 // Get the profile from the service 1214 TrackingProfile profile = null; 1215 1216 if (!service.TryGetProfile(workflowType, out profile)) 1217 { 1218 // 1219 // No profile for this schedule from this service 1220 // RemoveProfile will just mark this service/schedule as not currently having a profile in the cache 1221 RemoveProfile(workflowType, service.GetType()); 1222 return null; 1223 } 1224 // 1225 // Check the cache to see if we already have this version 1226 // For TrackingService types this is necessary. 1227 // For IProfileNotification types this is a bit redundant 1228 // but another threadcould have inserted the profile into the cache 1229 // so check again before acquiring the writer lock 1230 if (TryGetFromCache(service.GetType(), workflowType, profile.Version, out tp)) 1231 return tp; 1232 // 1233 // No profile, create it 1234 string xaml = schedule.GetValue(Activity.WorkflowXamlMarkupProperty) as string; 1235 if (null != xaml && xaml.Length > 0) 1236 { 1237 // 1238 // Never add xaml only workflows to the cache 1239 // Each one must be handled distinctly 1240 return CreateProfile(profile, schedule, service.GetType()); 1241 } 1242 else 1243 { 1244 tp = CreateProfile(profile, workflowType, service.GetType()); 1245 } 1246 1247 lock (_cacheLock) 1248 { 1249 // 1250 // Recheck the cache with exclusive access 1251 RTTrackingProfile tmp = null; 1252 if (TryGetFromCache(service.GetType(), workflowType, profile.Version, out tmp)) 1253 return tmp; 1254 // 1255 // Add it to the cache 1256 if (!AddToCache(tp, service.GetType())) 1257 throw new ApplicationException(ExecutionStringManager.ProfileCacheInsertFailure); 1258 1259 return tp; 1260 } 1261 } 1262 /// <summary> 1263 /// Retrieves the specified version of a profile from the specified service 1264 /// </summary> GetProfile(TrackingService service, Activity workflow, Version versionId)1265 internal RTTrackingProfile GetProfile(TrackingService service, Activity workflow, Version versionId) 1266 { 1267 if (null == service) 1268 throw new ArgumentNullException("service"); 1269 if (null == workflow) 1270 throw new ArgumentNullException("workflow"); 1271 1272 if (!_init) 1273 throw new InvalidOperationException(ExecutionStringManager.TrackingProfileManagerNotInitialized); 1274 1275 Type workflowType = workflow.GetType(); 1276 RTTrackingProfile tp = null; 1277 // 1278 // Looking for a specific version, see if it is in the cache 1279 if (TryGetFromCache(service.GetType(), workflowType, versionId, out tp)) 1280 return tp; 1281 1282 TrackingProfile profile = service.GetProfile(workflowType, versionId); 1283 // 1284 // No profile, create it 1285 string xaml = workflow.GetValue(Activity.WorkflowXamlMarkupProperty) as string; 1286 if (null != xaml && xaml.Length > 0) 1287 { 1288 // 1289 // Never add xaml only workflows to the cache 1290 // Each one must be handled distinctly 1291 return CreateProfile(profile, workflow, service.GetType()); 1292 } 1293 else 1294 { 1295 tp = CreateProfile(profile, workflowType, service.GetType()); 1296 } 1297 1298 lock (_cacheLock) 1299 { 1300 // 1301 // Recheck the cache with exclusive access 1302 RTTrackingProfile tmp = null; 1303 if (TryGetFromCache(service.GetType(), workflowType, versionId, out tmp)) 1304 return tmp; 1305 // 1306 // Add it to the cache 1307 if (!AddToCache(tp, service.GetType())) 1308 throw new ApplicationException(ExecutionStringManager.ProfileCacheInsertFailure); 1309 1310 return tp; 1311 } 1312 } 1313 GetProfile(TrackingService service, Activity workflow, Guid instanceId)1314 internal RTTrackingProfile GetProfile(TrackingService service, Activity workflow, Guid instanceId) 1315 { 1316 // 1317 // An instance based profile will never be in the cache 1318 TrackingProfile profile = service.GetProfile(instanceId); 1319 1320 if (null == profile) 1321 return null; 1322 1323 return new RTTrackingProfile(profile, workflow, service.GetType()); 1324 } 1325 1326 #region Private Methods 1327 CreateProfile(TrackingProfile profile, Type workflowType, Type serviceType)1328 private RTTrackingProfile CreateProfile(TrackingProfile profile, Type workflowType, Type serviceType) 1329 { 1330 // 1331 // Can't use the activity definition that we have here, it may have been updated 1332 // Get the base definition and use it to create the profile. 1333 Activity tmpSchedule = _runtime.GetWorkflowDefinition(workflowType); 1334 return new RTTrackingProfile(profile, tmpSchedule, serviceType); 1335 } CreateProfile(TrackingProfile profile, Activity schedule, Type serviceType)1336 private RTTrackingProfile CreateProfile(TrackingProfile profile, Activity schedule, Type serviceType) 1337 { 1338 // 1339 // This is called for Xaml only workflows 1340 return new RTTrackingProfile(profile, schedule, serviceType); 1341 } 1342 /// <summary> 1343 /// Add a profile to the cache but do not reset the NoProfiles flag for the schedule type 1344 /// </summary> 1345 /// <param name="profile">RTTrackingProfile to add</param> 1346 /// <param name="serviceType">TrackingService type</param> 1347 /// <returns>True if the profile was successfully added; false if not</returns> AddToCache(RTTrackingProfile profile, Type serviceType)1348 private bool AddToCache(RTTrackingProfile profile, Type serviceType) 1349 { 1350 return AddToCache(profile, serviceType, false); 1351 } 1352 /// <summary> 1353 /// Adds a profile to the cache and optionally resets the NoProfiles flag for the schedule type 1354 /// </summary> 1355 /// <param name="profile">RTTrackingProfile to add</param> 1356 /// <param name="serviceType">TrackingService type</param> 1357 /// <param name="resetNoProfiles">true will reset NoProfiles (to false); false will leave NoProfiles as is</param> 1358 /// <returns>True if the profile was successfully added; false if not</returns> AddToCache(RTTrackingProfile profile, Type serviceType, bool resetNoProfiles)1359 private bool AddToCache(RTTrackingProfile profile, Type serviceType, bool resetNoProfiles) 1360 { 1361 // 1362 // Profile may be null, serviceType may not 1363 if (null == serviceType) 1364 return false; 1365 1366 lock (_cacheLock) 1367 { 1368 Dictionary<Type, ProfileList> schedules = null; 1369 // 1370 // Get the dictionary for the service type, 1371 // create it if it doesn't exist 1372 if (!_cacheLookup.TryGetValue(serviceType, out schedules)) 1373 { 1374 schedules = new Dictionary<Type, ProfileList>(); 1375 _cacheLookup.Add(serviceType, schedules); 1376 } 1377 // 1378 // The the ProfileList for the schedule type, 1379 // create it if it doesn't exist 1380 ProfileList profiles = null; 1381 if (!schedules.TryGetValue(profile.WorkflowType, out profiles)) 1382 { 1383 profiles = new ProfileList(); 1384 schedules.Add(profile.WorkflowType, profiles); 1385 } 1386 if (resetNoProfiles) 1387 profiles.NoProfile = false; 1388 return profiles.Profiles.TryAdd(new CacheItem(profile)); 1389 } 1390 } 1391 /// <summary> 1392 /// Gets a profile from the cache 1393 /// </summary> TryGetFromCache(Type serviceType, Type workflowType, out RTTrackingProfile profile)1394 private bool TryGetFromCache(Type serviceType, Type workflowType, out RTTrackingProfile profile) 1395 { 1396 return TryGetFromCache(serviceType, workflowType, new Version(0, 0), out profile); // 0 is an internal signal to get the most current 1397 } 1398 /// <summary> 1399 /// Gets a profile from the cache 1400 /// </summary> TryGetFromCache(Type serviceType, Type workflowType, Version versionId, out RTTrackingProfile profile)1401 private bool TryGetFromCache(Type serviceType, Type workflowType, Version versionId, out RTTrackingProfile profile) 1402 { 1403 profile = null; 1404 CacheItem item = null; 1405 lock (_cacheLock) 1406 { 1407 Dictionary<Type, ProfileList> schedules = null; 1408 1409 if (!_cacheLookup.TryGetValue(serviceType, out schedules)) 1410 return false; 1411 1412 ProfileList profiles = null; 1413 if (!schedules.TryGetValue(workflowType, out profiles)) 1414 return false; 1415 1416 // 1417 // 0 means get the current version 1418 if (0 == versionId.Major) 1419 { 1420 // 1421 // Currently the schedule type doesn't have a profile associated to it 1422 if (profiles.NoProfile) 1423 return true; 1424 1425 if ((null == profiles.Profiles) || (0 == profiles.Profiles.Count)) 1426 return false; 1427 1428 // 1429 // Current version is highest versionId 1430 // which means it is at the end of the Set 1431 int endPos = profiles.Profiles.Count - 1; 1432 1433 if (null == profiles.Profiles[endPos]) 1434 return false; 1435 1436 profile = profiles.Profiles[endPos].TrackingProfile; 1437 return true; 1438 } 1439 else 1440 { 1441 if ((null == profiles.Profiles) || (0 == profiles.Profiles.Count)) 1442 return false; 1443 1444 if (profiles.Profiles.TryGetValue(new CacheItem(workflowType, versionId), out item)) 1445 { 1446 profile = item.TrackingProfile; 1447 return true; 1448 } 1449 else 1450 return false; 1451 } 1452 } 1453 } 1454 1455 #endregion 1456 1457 #region Event Handlers 1458 /// <summary> 1459 /// Listens on ProfileUpdated events from IProfileNotification services 1460 /// </summary> 1461 /// <param name="sender">Type of the tracking service sending the update</param> 1462 /// <param name="e">ProfileUpdatedEventArgs containing the new profile and the schedule type</param> ProfileUpdated(object sender, ProfileUpdatedEventArgs e)1463 private void ProfileUpdated(object sender, ProfileUpdatedEventArgs e) 1464 { 1465 if (null == sender) 1466 throw new ArgumentNullException("sender"); 1467 1468 Type t = sender.GetType(); 1469 1470 if (null == e.WorkflowType) 1471 throw new ArgumentNullException("e"); 1472 1473 if (null == e.TrackingProfile) 1474 { 1475 RemoveProfile(e.WorkflowType, t); 1476 return; 1477 } 1478 1479 RTTrackingProfile profile = CreateProfile(e.TrackingProfile, e.WorkflowType, t); 1480 // 1481 // If AddToCache fails this version is already in the cache and we don't care 1482 AddToCache(profile, t, true); 1483 } 1484 ProfileRemoved(object sender, ProfileRemovedEventArgs e)1485 private void ProfileRemoved(object sender, ProfileRemovedEventArgs e) 1486 { 1487 if (null == sender) 1488 throw new ArgumentNullException("sender"); 1489 1490 if (null == e.WorkflowType) 1491 throw new ArgumentNullException("e"); 1492 1493 RemoveProfile(e.WorkflowType, sender.GetType()); 1494 } 1495 RemoveProfile(Type workflowType, Type serviceType)1496 private void RemoveProfile(Type workflowType, Type serviceType) 1497 { 1498 lock (_cacheLock) 1499 { 1500 Dictionary<Type, ProfileList> schedules = null; 1501 1502 if (!_cacheLookup.TryGetValue(serviceType, out schedules)) 1503 { 1504 schedules = new Dictionary<Type, ProfileList>(); 1505 _cacheLookup.Add(serviceType, schedules); 1506 } 1507 ProfileList profiles = null; 1508 if (!schedules.TryGetValue(workflowType, out profiles)) 1509 { 1510 profiles = new ProfileList(); 1511 schedules.Add(workflowType, profiles); 1512 } 1513 // 1514 // Finally indicate that there isn't a profile for this schedule type 1515 // Calling UpdateProfile for this type will result in resetting this field 1516 // regardless of whether the version of the profile passed is in the cache or not 1517 profiles.NoProfile = true; 1518 } 1519 } 1520 #endregion 1521 1522 #region Private Classes 1523 private class ProfileList 1524 { 1525 internal bool NoProfile = false; 1526 internal Set<CacheItem> Profiles = new Set<CacheItem>(5); 1527 } 1528 1529 private class CacheItem : IComparable 1530 { 1531 internal RTTrackingProfile TrackingProfile = null; 1532 internal DateTime LastAccess = DateTime.UtcNow; 1533 // 1534 // VersionId and ScheduleType are stored separately from the profile so that they 1535 // can be used to identify the profile if it has been pushed from the cache. 1536 internal Version VersionId = new Version(0, 0); 1537 internal Type ScheduleType = null; 1538 CacheItem()1539 internal CacheItem() 1540 { 1541 } 1542 CacheItem(RTTrackingProfile profile)1543 internal CacheItem(RTTrackingProfile profile) 1544 { 1545 if (null == profile) 1546 throw new ArgumentNullException("profile"); 1547 1548 ScheduleType = profile.WorkflowType; 1549 1550 this.TrackingProfile = profile; 1551 VersionId = profile.Version; 1552 } 1553 CacheItem(Type workflowType, Version versionId)1554 internal CacheItem(Type workflowType, Version versionId) 1555 { 1556 VersionId = versionId; 1557 ScheduleType = workflowType; 1558 } 1559 1560 #region IComparable Members 1561 CompareTo(object obj)1562 public int CompareTo(object obj) 1563 { 1564 if (!(obj is CacheItem)) 1565 throw new ArgumentException(ExecutionStringManager.InvalidCacheItem); 1566 1567 CacheItem item = (CacheItem)obj; 1568 if ((VersionId == item.VersionId) && (ScheduleType == item.ScheduleType)) 1569 return 0; 1570 else 1571 return (VersionId > item.VersionId) ? 1 : -1; 1572 } 1573 1574 #endregion 1575 } 1576 #endregion 1577 1578 } 1579 /// <summary> 1580 /// Represents a wrapper around a channel and its artifacts, such as its tracking service type and profile 1581 /// </summary> 1582 internal class TrackingChannelWrapper 1583 { 1584 private Type _serviceType = null, _scheduleType = null; 1585 private TrackingChannel _channel = null; 1586 [NonSerialized] 1587 private RTTrackingProfile _profile = null; 1588 private Version _profileVersionId; 1589 TrackingChannelWrapper()1590 private TrackingChannelWrapper() { } 1591 TrackingChannelWrapper(TrackingChannel channel, Type serviceType, Type workflowType, RTTrackingProfile profile)1592 public TrackingChannelWrapper(TrackingChannel channel, Type serviceType, Type workflowType, RTTrackingProfile profile) 1593 { 1594 _serviceType = serviceType; 1595 _scheduleType = workflowType; 1596 _channel = channel; 1597 _profile = profile; 1598 _profileVersionId = profile.Version; 1599 } 1600 1601 internal Type TrackingServiceType 1602 { 1603 get { return _serviceType; } 1604 } 1605 1606 internal TrackingChannel TrackingChannel 1607 { 1608 get { return _channel; } 1609 } 1610 /// <summary> 1611 /// Get the tracking profile for the channel 1612 /// </summary> 1613 /// <param name="exec">BaseExecutor</param> 1614 /// <returns>RTTrackingProfile</returns> GetTrackingProfile(WorkflowExecutor skedExec)1615 internal RTTrackingProfile GetTrackingProfile(WorkflowExecutor skedExec) 1616 { 1617 if (null != _profile) 1618 return _profile; 1619 else 1620 throw new InvalidOperationException(String.Format(System.Globalization.CultureInfo.CurrentCulture, ExecutionStringManager.NullProfileForChannel, this._scheduleType.AssemblyQualifiedName)); 1621 } 1622 SetTrackingProfile(RTTrackingProfile profile)1623 internal void SetTrackingProfile(RTTrackingProfile profile) 1624 { 1625 _profile = profile; 1626 } 1627 1628 /// <summary> 1629 /// Clone the tracking profile stored in the cache and 1630 /// </summary> 1631 /// <param name="exec"></param> MakeProfilePrivate(WorkflowExecutor exec)1632 internal void MakeProfilePrivate(WorkflowExecutor exec) 1633 { 1634 if (null != _profile) 1635 { 1636 // 1637 // If the profile is not already a private copy make it so 1638 if (!_profile.IsPrivate) 1639 { 1640 _profile = _profile.Clone(); 1641 _profile.IsPrivate = true; 1642 } 1643 } 1644 else 1645 { 1646 // 1647 // We're not holding a reference to a profile 1648 // so get it from the cache and clone it into a private copy 1649 RTTrackingProfile tmp = GetTrackingProfile(exec); 1650 _profile = tmp.Clone(); 1651 _profile.IsPrivate = true; 1652 } 1653 } 1654 } 1655 internal class Set<T> : IEnumerable<T> where T : IComparable 1656 { 1657 List<T> list = null; 1658 Set()1659 public Set() 1660 { 1661 list = new List<T>(); 1662 } 1663 Set(int capacity)1664 public Set(int capacity) 1665 { 1666 list = new List<T>(capacity); 1667 } 1668 1669 public int Count 1670 { 1671 get { return list.Count; } 1672 } 1673 Add(T item)1674 public void Add(T item) 1675 { 1676 int pos = -1; 1677 if (!Search(item, out pos)) 1678 list.Insert(pos, item); 1679 else 1680 throw new ArgumentException(ExecutionStringManager.ItemAlreadyExist); 1681 } 1682 TryAdd(T item)1683 public bool TryAdd(T item) 1684 { 1685 int pos = -1; 1686 if (!Search(item, out pos)) 1687 { 1688 list.Insert(pos, item); 1689 return true; 1690 } 1691 else 1692 return false; 1693 } 1694 Contains(T item)1695 public bool Contains(T item) 1696 { 1697 int pos = -1; 1698 return Search(item, out pos); 1699 } 1700 GetEnumerator()1701 public IEnumerator<T> GetEnumerator() 1702 { 1703 return list.GetEnumerator(); 1704 } 1705 IEnumerable.GetEnumerator()1706 System.Collections.IEnumerator IEnumerable.GetEnumerator() 1707 { 1708 return list.GetEnumerator(); 1709 } 1710 TryGetValue(T item, out T value)1711 public bool TryGetValue(T item, out T value) 1712 { 1713 int pos = -1; 1714 if (Search(item, out pos)) 1715 { 1716 value = list[pos]; 1717 return true; 1718 } 1719 else 1720 { 1721 value = default(T); 1722 return false; 1723 } 1724 } 1725 1726 public T this[int index] 1727 { 1728 get { return list[index]; } 1729 } 1730 Search(T item, out int insertPos)1731 private bool Search(T item, out int insertPos) 1732 { 1733 insertPos = -1; 1734 1735 int pos = 0, 1736 high = list.Count, 1737 low = -1, 1738 diff = 0; 1739 1740 while (high - low > 1) 1741 { 1742 pos = (high + low) / 2; 1743 1744 diff = list[pos].CompareTo(item); 1745 1746 if (0 == diff) 1747 { 1748 insertPos = pos; 1749 return true; 1750 } 1751 else if (diff > 0) 1752 high = pos; 1753 else 1754 low = pos; 1755 } 1756 1757 if (low == -1) 1758 { 1759 insertPos = 0; 1760 return false; 1761 } 1762 1763 if (0 != diff) 1764 { 1765 insertPos = (diff < 0) ? pos + 1 : pos; 1766 return false; 1767 } 1768 1769 return true; 1770 } 1771 } 1772 1773 /// <summary> 1774 /// Persisted tracking State pertaining to workflow invoking for an individual schedule. There could be multiple called schedules under 1775 /// an instance. 1776 /// </summary> 1777 [Serializable] 1778 internal class TrackingCallingState 1779 { 1780 #region data members 1781 private IList<string> callerActivityPathProxy; 1782 private Guid callerInstanceId; 1783 private Guid callerContextGuid; 1784 private Guid callerParentContextGuid; 1785 1786 #endregion data members 1787 1788 #region Property accessors 1789 1790 /// <summary> 1791 /// Activity proxy of the caller/execer activity, if any 1792 /// //@@Undone for Ashishmi: Hold on to ActivityPath Proxy in one of your class impl 1793 /// </summary> 1794 /// <value></value> 1795 internal IList<string> CallerActivityPathProxy 1796 { 1797 get { return callerActivityPathProxy; } 1798 set { callerActivityPathProxy = value; } 1799 } 1800 1801 /// <summary> 1802 /// Instance ID of the caller/exec'er schedule, if any 1803 /// </summary> 1804 /// <value></value> 1805 public Guid CallerWorkflowInstanceId 1806 { 1807 get { return callerInstanceId; } 1808 set { callerInstanceId = value; } 1809 } 1810 /// <summary> 1811 /// Context of the caller's invoke activity 1812 /// </summary> 1813 /// <value>int</value> 1814 public Guid CallerContextGuid 1815 { 1816 get { return callerContextGuid; } 1817 set { callerContextGuid = value; } 1818 } 1819 /// <summary> 1820 /// ParentContext of the caller's invoke activity 1821 /// </summary> 1822 /// <value>int</value> 1823 public Guid CallerParentContextGuid 1824 { 1825 get { return callerParentContextGuid; } 1826 set { callerParentContextGuid = value; } 1827 } 1828 1829 #endregion Property accessors 1830 1831 } 1832 1833 internal static class HashHelper 1834 { HashServiceType(Type serviceType)1835 internal static Guid HashServiceType(Type serviceType) 1836 { 1837 return HashServiceType(serviceType.AssemblyQualifiedName); 1838 } 1839 1840 [SuppressMessage("Microsoft.Cryptographic.Standard", "CA5350:MD5CannotBeUsed", 1841 Justification = "Design has been approved. We are not using MD5 for any security or cryptography purposes but rather as a hash.")] HashServiceType(String serviceFullTypeName)1842 internal static Guid HashServiceType(String serviceFullTypeName) 1843 { 1844 byte[] data; 1845 byte[] result; 1846 1847 UnicodeEncoding ue = new UnicodeEncoding(); 1848 data = ue.GetBytes(serviceFullTypeName); 1849 1850 if (AppSettings.FIPSRequired) 1851 { 1852 result = MD5PInvokeHelper.CalculateHash(data); 1853 } 1854 else 1855 { 1856 MD5 md5 = new MD5CryptoServiceProvider(); 1857 result = md5.ComputeHash(data); 1858 } 1859 1860 return new Guid(result); 1861 } 1862 } 1863 } 1864