1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #if !NO_PERF 4 using System.Collections.Generic; 5 using System.Reactive.Concurrency; 6 using System.Reactive.Disposables; 7 using System.Threading; 8 9 #if NO_SEMAPHORE 10 using System.Reactive.Threading; 11 #endif 12 13 namespace System.Reactive.Linq.ObservableImpl 14 { 15 class Delay<TSource> : Producer<TSource> 16 { 17 private readonly IObservable<TSource> _source; 18 private readonly TimeSpan? _dueTimeR; 19 private readonly DateTimeOffset? _dueTimeA; 20 private readonly IScheduler _scheduler; 21 Delay(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)22 public Delay(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) 23 { 24 _source = source; 25 _dueTimeR = dueTime; 26 _scheduler = scheduler; 27 } 28 Delay(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)29 public Delay(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler) 30 { 31 _source = source; 32 _dueTimeA = dueTime; 33 _scheduler = scheduler; 34 } 35 Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)36 protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink) 37 { 38 if (_scheduler.AsLongRunning() != null) 39 { 40 var sink = new LongRunningImpl(this, observer, cancel); 41 setSink(sink); 42 return sink.Run(); 43 } 44 else 45 { 46 var sink = new _(this, observer, cancel); 47 setSink(sink); 48 return sink.Run(); 49 } 50 } 51 52 class _ : Sink<TSource>, IObserver<TSource> 53 { 54 private readonly Delay<TSource> _parent; 55 _(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)56 public _(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel) 57 : base(observer, cancel) 58 { 59 _parent = parent; 60 } 61 62 private IScheduler _scheduler; 63 private IDisposable _sourceSubscription; 64 private SerialDisposable _cancelable; 65 private TimeSpan _delay; 66 private IStopwatch _watch; 67 68 private object _gate; 69 private bool _ready; 70 private bool _active; 71 private bool _running; 72 private Queue<System.Reactive.TimeInterval<TSource>> _queue; 73 private bool _hasCompleted; 74 private TimeSpan _completeAt; 75 private bool _hasFailed; 76 private Exception _exception; 77 Run()78 public IDisposable Run() 79 { 80 _scheduler = _parent._scheduler; 81 82 _cancelable = new SerialDisposable(); 83 84 _gate = new object(); 85 _active = false; 86 _running = false; 87 _queue = new Queue<System.Reactive.TimeInterval<TSource>>(); 88 _hasCompleted = false; 89 _completeAt = default(TimeSpan); 90 _hasFailed = false; 91 _exception = default(Exception); 92 93 _watch = _scheduler.StartStopwatch(); 94 95 if (_parent._dueTimeA.HasValue) 96 { 97 _ready = false; 98 99 var dueTimeA = _parent._dueTimeA.Value; 100 _cancelable.Disposable = _scheduler.Schedule(dueTimeA, Start); 101 } 102 else 103 { 104 _ready = true; 105 106 var dueTimeR = _parent._dueTimeR.Value; 107 _delay = Scheduler.Normalize(dueTimeR); 108 } 109 110 var sourceSubscription = new SingleAssignmentDisposable(); 111 _sourceSubscription = sourceSubscription; 112 sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); 113 114 return new CompositeDisposable(_sourceSubscription, _cancelable); 115 } 116 Start()117 private void Start() 118 { 119 var next = default(TimeSpan); 120 var shouldRun = false; 121 122 lock (_gate) 123 { 124 _delay = _watch.Elapsed; 125 126 var oldQueue = _queue; 127 _queue = new Queue<Reactive.TimeInterval<TSource>>(); 128 129 if (oldQueue.Count > 0) 130 { 131 next = oldQueue.Peek().Interval; 132 133 while (oldQueue.Count > 0) 134 { 135 var item = oldQueue.Dequeue(); 136 _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay))); 137 } 138 139 shouldRun = true; 140 _active = true; 141 } 142 143 _ready = true; 144 } 145 146 if (shouldRun) 147 { 148 _cancelable.Disposable = _scheduler.Schedule(next, DrainQueue); 149 } 150 } 151 OnNext(TSource value)152 public void OnNext(TSource value) 153 { 154 var next = _watch.Elapsed.Add(_delay); 155 var shouldRun = false; 156 157 lock (_gate) 158 { 159 _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next)); 160 161 shouldRun = _ready && !_active; 162 _active = true; 163 } 164 165 if (shouldRun) 166 { 167 _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue); 168 } 169 } 170 OnError(Exception error)171 public void OnError(Exception error) 172 { 173 _sourceSubscription.Dispose(); 174 175 var shouldRun = false; 176 177 lock (_gate) 178 { 179 _queue.Clear(); 180 181 _exception = error; 182 _hasFailed = true; 183 184 shouldRun = !_running; 185 } 186 187 if (shouldRun) 188 { 189 base._observer.OnError(error); 190 base.Dispose(); 191 } 192 } 193 OnCompleted()194 public void OnCompleted() 195 { 196 _sourceSubscription.Dispose(); 197 198 var next = _watch.Elapsed.Add(_delay); 199 var shouldRun = false; 200 201 lock (_gate) 202 { 203 _completeAt = next; 204 _hasCompleted = true; 205 206 shouldRun = _ready && !_active; 207 _active = true; 208 } 209 210 if (shouldRun) 211 { 212 _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue); 213 } 214 } 215 DrainQueue(Action<TimeSpan> recurse)216 private void DrainQueue(Action<TimeSpan> recurse) 217 { 218 lock (_gate) 219 { 220 if (_hasFailed) 221 return; 222 _running = true; 223 } 224 225 // 226 // The shouldYield flag was added to address TFS 487881: "Delay can be unfair". In the old 227 // implementation, the loop below kept running while there was work for immediate dispatch, 228 // potentially causing a long running work item on the target scheduler. With the addition 229 // of long-running scheduling in Rx v2.0, we can check whether the scheduler supports this 230 // interface and perform different processing (see LongRunningImpl). To reduce the code 231 // churn in the old loop code here, we set the shouldYield flag to true after the first 232 // dispatch iteration, in order to break from the loop and enter the recursive scheduling path. 233 // 234 var shouldYield = false; 235 236 while (true) 237 { 238 var hasFailed = false; 239 var error = default(Exception); 240 241 var hasValue = false; 242 var value = default(TSource); 243 var hasCompleted = false; 244 245 var shouldRecurse = false; 246 var recurseDueTime = default(TimeSpan); 247 248 lock (_gate) 249 { 250 if (_hasFailed) 251 { 252 error = _exception; 253 hasFailed = true; 254 _running = false; 255 } 256 else 257 { 258 var now = _watch.Elapsed; 259 260 if (_queue.Count > 0) 261 { 262 var nextDue = _queue.Peek().Interval; 263 264 if (nextDue.CompareTo(now) <= 0 && !shouldYield) 265 { 266 value = _queue.Dequeue().Value; 267 hasValue = true; 268 } 269 else 270 { 271 shouldRecurse = true; 272 recurseDueTime = Scheduler.Normalize(nextDue.Subtract(now)); 273 _running = false; 274 } 275 } 276 else if (_hasCompleted) 277 { 278 if (_completeAt.CompareTo(now) <= 0 && !shouldYield) 279 { 280 hasCompleted = true; 281 } 282 else 283 { 284 shouldRecurse = true; 285 recurseDueTime = Scheduler.Normalize(_completeAt.Subtract(now)); 286 _running = false; 287 } 288 } 289 else 290 { 291 _running = false; 292 _active = false; 293 } 294 } 295 } /* lock (_gate) */ 296 297 if (hasValue) 298 { 299 base._observer.OnNext(value); 300 shouldYield = true; 301 } 302 else 303 { 304 if (hasCompleted) 305 { 306 base._observer.OnCompleted(); 307 base.Dispose(); 308 } 309 else if (hasFailed) 310 { 311 base._observer.OnError(error); 312 base.Dispose(); 313 } 314 else if (shouldRecurse) 315 { 316 recurse(recurseDueTime); 317 } 318 319 return; 320 } 321 } /* while (true) */ 322 } 323 } 324 325 class LongRunningImpl : Sink<TSource>, IObserver<TSource> 326 { 327 private readonly Delay<TSource> _parent; 328 LongRunningImpl(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)329 public LongRunningImpl(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel) 330 : base(observer, cancel) 331 { 332 _parent = parent; 333 } 334 335 private IDisposable _sourceSubscription; 336 private SerialDisposable _cancelable; 337 private TimeSpan _delay; 338 private IStopwatch _watch; 339 340 private object _gate; 341 #if !NO_CDS 342 private SemaphoreSlim _evt; 343 private CancellationTokenSource _stop; 344 #else 345 private Semaphore _evt; 346 private bool _stopped; 347 private ManualResetEvent _stop; 348 #endif 349 private Queue<System.Reactive.TimeInterval<TSource>> _queue; 350 private bool _hasCompleted; 351 private TimeSpan _completeAt; 352 private bool _hasFailed; 353 private Exception _exception; 354 Run()355 public IDisposable Run() 356 { 357 _cancelable = new SerialDisposable(); 358 359 _gate = new object(); 360 #if !NO_CDS 361 _evt = new SemaphoreSlim(0); 362 #else 363 _evt = new Semaphore(0, int.MaxValue); 364 #endif 365 _queue = new Queue<System.Reactive.TimeInterval<TSource>>(); 366 _hasCompleted = false; 367 _completeAt = default(TimeSpan); 368 _hasFailed = false; 369 _exception = default(Exception); 370 371 _watch = _parent._scheduler.StartStopwatch(); 372 373 if (_parent._dueTimeA.HasValue) 374 { 375 var dueTimeA = _parent._dueTimeA.Value; 376 _cancelable.Disposable = _parent._scheduler.Schedule(dueTimeA, Start); 377 } 378 else 379 { 380 var dueTimeR = _parent._dueTimeR.Value; 381 _delay = Scheduler.Normalize(dueTimeR); 382 ScheduleDrain(); 383 } 384 385 var sourceSubscription = new SingleAssignmentDisposable(); 386 _sourceSubscription = sourceSubscription; 387 sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); 388 389 return new CompositeDisposable(_sourceSubscription, _cancelable); 390 } 391 Start()392 private void Start() 393 { 394 lock (_gate) 395 { 396 _delay = _watch.Elapsed; 397 398 var oldQueue = _queue; 399 _queue = new Queue<Reactive.TimeInterval<TSource>>(); 400 401 while (oldQueue.Count > 0) 402 { 403 var item = oldQueue.Dequeue(); 404 _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay))); 405 } 406 } 407 408 ScheduleDrain(); 409 } 410 ScheduleDrain()411 private void ScheduleDrain() 412 { 413 #if !NO_CDS 414 _stop = new CancellationTokenSource(); 415 _cancelable.Disposable = Disposable.Create(() => _stop.Cancel()); 416 #else 417 _stop = new ManualResetEvent(false); 418 _cancelable.Disposable = Disposable.Create(() => 419 { 420 _stopped = true; 421 _stop.Set(); 422 _evt.Release(); 423 }); 424 #endif 425 426 _parent._scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue); 427 } 428 OnNext(TSource value)429 public void OnNext(TSource value) 430 { 431 var next = _watch.Elapsed.Add(_delay); 432 433 lock (_gate) 434 { 435 _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next)); 436 437 _evt.Release(); 438 } 439 } 440 OnError(Exception error)441 public void OnError(Exception error) 442 { 443 _sourceSubscription.Dispose(); 444 445 lock (_gate) 446 { 447 _queue.Clear(); 448 449 _exception = error; 450 _hasFailed = true; 451 452 _evt.Release(); 453 } 454 } 455 OnCompleted()456 public void OnCompleted() 457 { 458 _sourceSubscription.Dispose(); 459 460 var next = _watch.Elapsed.Add(_delay); 461 462 lock (_gate) 463 { 464 _completeAt = next; 465 _hasCompleted = true; 466 467 _evt.Release(); 468 } 469 } 470 DrainQueue(ICancelable cancel)471 private void DrainQueue(ICancelable cancel) 472 { 473 while (true) 474 { 475 #if !NO_CDS 476 try 477 { 478 _evt.Wait(_stop.Token); 479 } 480 catch (OperationCanceledException) 481 { 482 return; 483 } 484 #else 485 _evt.WaitOne(); 486 if (_stopped) 487 return; 488 #endif 489 490 var hasFailed = false; 491 var error = default(Exception); 492 493 var hasValue = false; 494 var value = default(TSource); 495 var hasCompleted = false; 496 497 var shouldWait = false; 498 var waitTime = default(TimeSpan); 499 500 lock (_gate) 501 { 502 if (_hasFailed) 503 { 504 error = _exception; 505 hasFailed = true; 506 } 507 else 508 { 509 var now = _watch.Elapsed; 510 511 if (_queue.Count > 0) 512 { 513 var next = _queue.Dequeue(); 514 515 hasValue = true; 516 value = next.Value; 517 518 var nextDue = next.Interval; 519 if (nextDue.CompareTo(now) > 0) 520 { 521 shouldWait = true; 522 waitTime = Scheduler.Normalize(nextDue.Subtract(now)); 523 } 524 } 525 else if (_hasCompleted) 526 { 527 hasCompleted = true; 528 529 if (_completeAt.CompareTo(now) > 0) 530 { 531 shouldWait = true; 532 waitTime = Scheduler.Normalize(_completeAt.Subtract(now)); 533 } 534 } 535 } 536 } /* lock (_gate) */ 537 538 if (shouldWait) 539 { 540 #if !NO_CDS 541 var timer = new ManualResetEventSlim(); 542 _parent._scheduler.Schedule(waitTime, () => { timer.Set(); }); 543 544 try 545 { 546 timer.Wait(_stop.Token); 547 } 548 catch (OperationCanceledException) 549 { 550 return; 551 } 552 #else 553 var timer = new ManualResetEvent(false); 554 _parent._scheduler.Schedule(waitTime, () => { timer.Set(); }); 555 if (WaitHandle.WaitAny(new[] { timer, _stop }) == 1) 556 return; 557 #endif 558 } 559 560 if (hasValue) 561 { 562 base._observer.OnNext(value); 563 } 564 else 565 { 566 if (hasCompleted) 567 { 568 base._observer.OnCompleted(); 569 base.Dispose(); 570 } 571 else if (hasFailed) 572 { 573 base._observer.OnError(error); 574 base.Dispose(); 575 } 576 577 return; 578 } 579 } 580 } 581 } 582 } 583 584 class Delay<TSource, TDelay> : Producer<TSource> 585 { 586 private readonly IObservable<TSource> _source; 587 private readonly IObservable<TDelay> _subscriptionDelay; 588 private readonly Func<TSource, IObservable<TDelay>> _delaySelector; 589 Delay(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delaySelector)590 public Delay(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delaySelector) 591 { 592 _source = source; 593 _subscriptionDelay = subscriptionDelay; 594 _delaySelector = delaySelector; 595 } 596 Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)597 protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink) 598 { 599 var sink = new _(this, observer, cancel); 600 setSink(sink); 601 return sink.Run(); 602 } 603 604 class _ : Sink<TSource>, IObserver<TSource> 605 { 606 private readonly Delay<TSource, TDelay> _parent; 607 _(Delay<TSource, TDelay> parent, IObserver<TSource> observer, IDisposable cancel)608 public _(Delay<TSource, TDelay> parent, IObserver<TSource> observer, IDisposable cancel) 609 : base(observer, cancel) 610 { 611 _parent = parent; 612 } 613 614 private CompositeDisposable _delays; 615 private object _gate; 616 private bool _atEnd; 617 private SerialDisposable _subscription; 618 Run()619 public IDisposable Run() 620 { 621 _delays = new CompositeDisposable(); 622 _gate = new object(); 623 _atEnd = false; 624 _subscription = new SerialDisposable(); 625 626 if (_parent._subscriptionDelay == null) 627 { 628 Start(); 629 } 630 else 631 { 632 _subscription.Disposable = _parent._subscriptionDelay.SubscribeSafe(new SubscriptionDelay(this)); 633 } 634 635 return new CompositeDisposable(_subscription, _delays); 636 } 637 Start()638 private void Start() 639 { 640 _subscription.Disposable = _parent._source.SubscribeSafe(this); 641 } 642 OnNext(TSource value)643 public void OnNext(TSource value) 644 { 645 var delay = default(IObservable<TDelay>); 646 try 647 { 648 delay = _parent._delaySelector(value); 649 } 650 catch (Exception error) 651 { 652 lock (_gate) 653 { 654 base._observer.OnError(error); 655 base.Dispose(); 656 } 657 658 return; 659 } 660 661 var d = new SingleAssignmentDisposable(); 662 _delays.Add(d); 663 d.Disposable = delay.SubscribeSafe(new Delta(this, value, d)); 664 } 665 OnError(Exception error)666 public void OnError(Exception error) 667 { 668 lock (_gate) 669 { 670 base._observer.OnError(error); 671 base.Dispose(); 672 } 673 } 674 OnCompleted()675 public void OnCompleted() 676 { 677 lock (_gate) 678 { 679 _atEnd = true; 680 _subscription.Dispose(); 681 682 CheckDone(); 683 } 684 } 685 CheckDone()686 private void CheckDone() 687 { 688 if (_atEnd && _delays.Count == 0) 689 { 690 base._observer.OnCompleted(); 691 base.Dispose(); 692 } 693 } 694 695 class SubscriptionDelay : IObserver<TDelay> 696 { 697 private readonly _ _parent; 698 SubscriptionDelay(_ parent)699 public SubscriptionDelay(_ parent) 700 { 701 _parent = parent; 702 } 703 OnNext(TDelay value)704 public void OnNext(TDelay value) 705 { 706 _parent.Start(); 707 } 708 OnError(Exception error)709 public void OnError(Exception error) 710 { 711 _parent._observer.OnError(error); 712 _parent.Dispose(); 713 } 714 OnCompleted()715 public void OnCompleted() 716 { 717 _parent.Start(); 718 } 719 } 720 721 class Delta : IObserver<TDelay> 722 { 723 private readonly _ _parent; 724 private readonly TSource _value; 725 private readonly IDisposable _self; 726 Delta(_ parent, TSource value, IDisposable self)727 public Delta(_ parent, TSource value, IDisposable self) 728 { 729 _parent = parent; 730 _value = value; 731 _self = self; 732 } 733 OnNext(TDelay value)734 public void OnNext(TDelay value) 735 { 736 lock (_parent._gate) 737 { 738 _parent._observer.OnNext(_value); 739 740 _parent._delays.Remove(_self); 741 _parent.CheckDone(); 742 } 743 } 744 OnError(Exception error)745 public void OnError(Exception error) 746 { 747 lock (_parent._gate) 748 { 749 _parent._observer.OnError(error); 750 _parent.Dispose(); 751 } 752 } 753 OnCompleted()754 public void OnCompleted() 755 { 756 lock (_parent._gate) 757 { 758 _parent._observer.OnNext(_value); 759 760 _parent._delays.Remove(_self); 761 _parent.CheckDone(); 762 } 763 } 764 } 765 } 766 } 767 } 768 #endif