1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #if !NO_PERF
4 using System.Collections.Generic;
5 using System.Reactive.Concurrency;
6 using System.Reactive.Disposables;
7 using System.Threading;
8 
9 #if NO_SEMAPHORE
10 using System.Reactive.Threading;
11 #endif
12 
13 namespace System.Reactive.Linq.ObservableImpl
14 {
15     class Delay<TSource> : Producer<TSource>
16     {
17         private readonly IObservable<TSource> _source;
18         private readonly TimeSpan? _dueTimeR;
19         private readonly DateTimeOffset? _dueTimeA;
20         private readonly IScheduler _scheduler;
21 
Delay(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)22         public Delay(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
23         {
24             _source = source;
25             _dueTimeR = dueTime;
26             _scheduler = scheduler;
27         }
28 
Delay(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)29         public Delay(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
30         {
31             _source = source;
32             _dueTimeA = dueTime;
33             _scheduler = scheduler;
34         }
35 
Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)36         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
37         {
38             if (_scheduler.AsLongRunning() != null)
39             {
40                 var sink = new LongRunningImpl(this, observer, cancel);
41                 setSink(sink);
42                 return sink.Run();
43             }
44             else
45             {
46                 var sink = new _(this, observer, cancel);
47                 setSink(sink);
48                 return sink.Run();
49             }
50         }
51 
52         class _ : Sink<TSource>, IObserver<TSource>
53         {
54             private readonly Delay<TSource> _parent;
55 
_(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)56             public _(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
57                 : base(observer, cancel)
58             {
59                 _parent = parent;
60             }
61 
62             private IScheduler _scheduler;
63             private IDisposable _sourceSubscription;
64             private SerialDisposable _cancelable;
65             private TimeSpan _delay;
66             private IStopwatch _watch;
67 
68             private object _gate;
69             private bool _ready;
70             private bool _active;
71             private bool _running;
72             private Queue<System.Reactive.TimeInterval<TSource>> _queue;
73             private bool _hasCompleted;
74             private TimeSpan _completeAt;
75             private bool _hasFailed;
76             private Exception _exception;
77 
Run()78             public IDisposable Run()
79             {
80                 _scheduler = _parent._scheduler;
81 
82                 _cancelable = new SerialDisposable();
83 
84                 _gate = new object();
85                 _active = false;
86                 _running = false;
87                 _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
88                 _hasCompleted = false;
89                 _completeAt = default(TimeSpan);
90                 _hasFailed = false;
91                 _exception = default(Exception);
92 
93                 _watch = _scheduler.StartStopwatch();
94 
95                 if (_parent._dueTimeA.HasValue)
96                 {
97                     _ready = false;
98 
99                     var dueTimeA = _parent._dueTimeA.Value;
100                     _cancelable.Disposable = _scheduler.Schedule(dueTimeA, Start);
101                 }
102                 else
103                 {
104                     _ready = true;
105 
106                     var dueTimeR = _parent._dueTimeR.Value;
107                     _delay = Scheduler.Normalize(dueTimeR);
108                 }
109 
110                 var sourceSubscription = new SingleAssignmentDisposable();
111                 _sourceSubscription = sourceSubscription;
112                 sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
113 
114                 return new CompositeDisposable(_sourceSubscription, _cancelable);
115             }
116 
Start()117             private void Start()
118             {
119                 var next = default(TimeSpan);
120                 var shouldRun = false;
121 
122                 lock (_gate)
123                 {
124                     _delay = _watch.Elapsed;
125 
126                     var oldQueue = _queue;
127                     _queue = new Queue<Reactive.TimeInterval<TSource>>();
128 
129                     if (oldQueue.Count > 0)
130                     {
131                         next = oldQueue.Peek().Interval;
132 
133                         while (oldQueue.Count > 0)
134                         {
135                             var item = oldQueue.Dequeue();
136                             _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
137                         }
138 
139                         shouldRun = true;
140                         _active = true;
141                     }
142 
143                     _ready = true;
144                 }
145 
146                 if (shouldRun)
147                 {
148                     _cancelable.Disposable = _scheduler.Schedule(next, DrainQueue);
149                 }
150             }
151 
OnNext(TSource value)152             public void OnNext(TSource value)
153             {
154                 var next = _watch.Elapsed.Add(_delay);
155                 var shouldRun = false;
156 
157                 lock (_gate)
158                 {
159                     _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
160 
161                     shouldRun = _ready && !_active;
162                     _active = true;
163                 }
164 
165                 if (shouldRun)
166                 {
167                     _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
168                 }
169             }
170 
OnError(Exception error)171             public void OnError(Exception error)
172             {
173                 _sourceSubscription.Dispose();
174 
175                 var shouldRun = false;
176 
177                 lock (_gate)
178                 {
179                     _queue.Clear();
180 
181                     _exception = error;
182                     _hasFailed = true;
183 
184                     shouldRun = !_running;
185                 }
186 
187                 if (shouldRun)
188                 {
189                     base._observer.OnError(error);
190                     base.Dispose();
191                 }
192             }
193 
OnCompleted()194             public void OnCompleted()
195             {
196                 _sourceSubscription.Dispose();
197 
198                 var next = _watch.Elapsed.Add(_delay);
199                 var shouldRun = false;
200 
201                 lock (_gate)
202                 {
203                     _completeAt = next;
204                     _hasCompleted = true;
205 
206                     shouldRun = _ready && !_active;
207                     _active = true;
208                 }
209 
210                 if (shouldRun)
211                 {
212                     _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
213                 }
214             }
215 
DrainQueue(Action<TimeSpan> recurse)216             private void DrainQueue(Action<TimeSpan> recurse)
217             {
218                 lock (_gate)
219                 {
220                     if (_hasFailed)
221                         return;
222                     _running = true;
223                 }
224 
225                 //
226                 // The shouldYield flag was added to address TFS 487881: "Delay can be unfair". In the old
227                 // implementation, the loop below kept running while there was work for immediate dispatch,
228                 // potentially causing a long running work item on the target scheduler. With the addition
229                 // of long-running scheduling in Rx v2.0, we can check whether the scheduler supports this
230                 // interface and perform different processing (see LongRunningImpl). To reduce the code
231                 // churn in the old loop code here, we set the shouldYield flag to true after the first
232                 // dispatch iteration, in order to break from the loop and enter the recursive scheduling path.
233                 //
234                 var shouldYield = false;
235 
236                 while (true)
237                 {
238                     var hasFailed = false;
239                     var error = default(Exception);
240 
241                     var hasValue = false;
242                     var value = default(TSource);
243                     var hasCompleted = false;
244 
245                     var shouldRecurse = false;
246                     var recurseDueTime = default(TimeSpan);
247 
248                     lock (_gate)
249                     {
250                         if (_hasFailed)
251                         {
252                             error = _exception;
253                             hasFailed = true;
254                             _running = false;
255                         }
256                         else
257                         {
258                             var now = _watch.Elapsed;
259 
260                             if (_queue.Count > 0)
261                             {
262                                 var nextDue = _queue.Peek().Interval;
263 
264                                 if (nextDue.CompareTo(now) <= 0 && !shouldYield)
265                                 {
266                                     value = _queue.Dequeue().Value;
267                                     hasValue = true;
268                                 }
269                                 else
270                                 {
271                                     shouldRecurse = true;
272                                     recurseDueTime = Scheduler.Normalize(nextDue.Subtract(now));
273                                     _running = false;
274                                 }
275                             }
276                             else if (_hasCompleted)
277                             {
278                                 if (_completeAt.CompareTo(now) <= 0 && !shouldYield)
279                                 {
280                                     hasCompleted = true;
281                                 }
282                                 else
283                                 {
284                                     shouldRecurse = true;
285                                     recurseDueTime = Scheduler.Normalize(_completeAt.Subtract(now));
286                                     _running = false;
287                                 }
288                             }
289                             else
290                             {
291                                 _running = false;
292                                 _active = false;
293                             }
294                         }
295                     } /* lock (_gate) */
296 
297                     if (hasValue)
298                     {
299                         base._observer.OnNext(value);
300                         shouldYield = true;
301                     }
302                     else
303                     {
304                         if (hasCompleted)
305                         {
306                             base._observer.OnCompleted();
307                             base.Dispose();
308                         }
309                         else if (hasFailed)
310                         {
311                             base._observer.OnError(error);
312                             base.Dispose();
313                         }
314                         else if (shouldRecurse)
315                         {
316                             recurse(recurseDueTime);
317                         }
318 
319                         return;
320                     }
321                 } /* while (true) */
322             }
323         }
324 
325         class LongRunningImpl : Sink<TSource>, IObserver<TSource>
326         {
327             private readonly Delay<TSource> _parent;
328 
LongRunningImpl(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)329             public LongRunningImpl(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
330                 : base(observer, cancel)
331             {
332                 _parent = parent;
333             }
334 
335             private IDisposable _sourceSubscription;
336             private SerialDisposable _cancelable;
337             private TimeSpan _delay;
338             private IStopwatch _watch;
339 
340             private object _gate;
341 #if !NO_CDS
342             private SemaphoreSlim _evt;
343             private CancellationTokenSource _stop;
344 #else
345             private Semaphore _evt;
346             private bool _stopped;
347             private ManualResetEvent _stop;
348 #endif
349             private Queue<System.Reactive.TimeInterval<TSource>> _queue;
350             private bool _hasCompleted;
351             private TimeSpan _completeAt;
352             private bool _hasFailed;
353             private Exception _exception;
354 
Run()355             public IDisposable Run()
356             {
357                 _cancelable = new SerialDisposable();
358 
359                 _gate = new object();
360 #if !NO_CDS
361                 _evt = new SemaphoreSlim(0);
362 #else
363                 _evt = new Semaphore(0, int.MaxValue);
364 #endif
365                 _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
366                 _hasCompleted = false;
367                 _completeAt = default(TimeSpan);
368                 _hasFailed = false;
369                 _exception = default(Exception);
370 
371                 _watch = _parent._scheduler.StartStopwatch();
372 
373                 if (_parent._dueTimeA.HasValue)
374                 {
375                     var dueTimeA = _parent._dueTimeA.Value;
376                     _cancelable.Disposable = _parent._scheduler.Schedule(dueTimeA, Start);
377                 }
378                 else
379                 {
380                     var dueTimeR = _parent._dueTimeR.Value;
381                     _delay = Scheduler.Normalize(dueTimeR);
382                     ScheduleDrain();
383                 }
384 
385                 var sourceSubscription = new SingleAssignmentDisposable();
386                 _sourceSubscription = sourceSubscription;
387                 sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
388 
389                 return new CompositeDisposable(_sourceSubscription, _cancelable);
390             }
391 
Start()392             private void Start()
393             {
394                 lock (_gate)
395                 {
396                     _delay = _watch.Elapsed;
397 
398                     var oldQueue = _queue;
399                     _queue = new Queue<Reactive.TimeInterval<TSource>>();
400 
401                     while (oldQueue.Count > 0)
402                     {
403                         var item = oldQueue.Dequeue();
404                         _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
405                     }
406                 }
407 
408                 ScheduleDrain();
409             }
410 
ScheduleDrain()411             private void ScheduleDrain()
412             {
413 #if !NO_CDS
414                 _stop = new CancellationTokenSource();
415                 _cancelable.Disposable = Disposable.Create(() => _stop.Cancel());
416 #else
417                 _stop = new ManualResetEvent(false);
418                 _cancelable.Disposable = Disposable.Create(() =>
419                 {
420                     _stopped = true;
421                     _stop.Set();
422                     _evt.Release();
423                 });
424 #endif
425 
426                 _parent._scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
427             }
428 
OnNext(TSource value)429             public void OnNext(TSource value)
430             {
431                 var next = _watch.Elapsed.Add(_delay);
432 
433                 lock (_gate)
434                 {
435                     _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
436 
437                     _evt.Release();
438                 }
439             }
440 
OnError(Exception error)441             public void OnError(Exception error)
442             {
443                 _sourceSubscription.Dispose();
444 
445                 lock (_gate)
446                 {
447                     _queue.Clear();
448 
449                     _exception = error;
450                     _hasFailed = true;
451 
452                     _evt.Release();
453                 }
454             }
455 
OnCompleted()456             public void OnCompleted()
457             {
458                 _sourceSubscription.Dispose();
459 
460                 var next = _watch.Elapsed.Add(_delay);
461 
462                 lock (_gate)
463                 {
464                     _completeAt = next;
465                     _hasCompleted = true;
466 
467                     _evt.Release();
468                 }
469             }
470 
DrainQueue(ICancelable cancel)471             private void DrainQueue(ICancelable cancel)
472             {
473                 while (true)
474                 {
475 #if !NO_CDS
476                     try
477                     {
478                         _evt.Wait(_stop.Token);
479                     }
480                     catch (OperationCanceledException)
481                     {
482                         return;
483                     }
484 #else
485                     _evt.WaitOne();
486                     if (_stopped)
487                         return;
488 #endif
489 
490                     var hasFailed = false;
491                     var error = default(Exception);
492 
493                     var hasValue = false;
494                     var value = default(TSource);
495                     var hasCompleted = false;
496 
497                     var shouldWait = false;
498                     var waitTime = default(TimeSpan);
499 
500                     lock (_gate)
501                     {
502                         if (_hasFailed)
503                         {
504                             error = _exception;
505                             hasFailed = true;
506                         }
507                         else
508                         {
509                             var now = _watch.Elapsed;
510 
511                             if (_queue.Count > 0)
512                             {
513                                 var next = _queue.Dequeue();
514 
515                                 hasValue = true;
516                                 value = next.Value;
517 
518                                 var nextDue = next.Interval;
519                                 if (nextDue.CompareTo(now) > 0)
520                                 {
521                                     shouldWait = true;
522                                     waitTime = Scheduler.Normalize(nextDue.Subtract(now));
523                                 }
524                             }
525                             else if (_hasCompleted)
526                             {
527                                 hasCompleted = true;
528 
529                                 if (_completeAt.CompareTo(now) > 0)
530                                 {
531                                     shouldWait = true;
532                                     waitTime = Scheduler.Normalize(_completeAt.Subtract(now));
533                                 }
534                             }
535                         }
536                     } /* lock (_gate) */
537 
538                     if (shouldWait)
539                     {
540 #if !NO_CDS
541                         var timer = new ManualResetEventSlim();
542                         _parent._scheduler.Schedule(waitTime, () => { timer.Set(); });
543 
544                         try
545                         {
546                             timer.Wait(_stop.Token);
547                         }
548                         catch (OperationCanceledException)
549                         {
550                             return;
551                         }
552 #else
553                         var timer = new ManualResetEvent(false);
554                         _parent._scheduler.Schedule(waitTime, () => { timer.Set(); });
555                         if (WaitHandle.WaitAny(new[] { timer, _stop }) == 1)
556                             return;
557 #endif
558                     }
559 
560                     if (hasValue)
561                     {
562                         base._observer.OnNext(value);
563                     }
564                     else
565                     {
566                         if (hasCompleted)
567                         {
568                             base._observer.OnCompleted();
569                             base.Dispose();
570                         }
571                         else if (hasFailed)
572                         {
573                             base._observer.OnError(error);
574                             base.Dispose();
575                         }
576 
577                         return;
578                     }
579                 }
580             }
581         }
582     }
583 
584     class Delay<TSource, TDelay> : Producer<TSource>
585     {
586         private readonly IObservable<TSource> _source;
587         private readonly IObservable<TDelay> _subscriptionDelay;
588         private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
589 
Delay(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delaySelector)590         public Delay(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delaySelector)
591         {
592             _source = source;
593             _subscriptionDelay = subscriptionDelay;
594             _delaySelector = delaySelector;
595         }
596 
Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)597         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
598         {
599             var sink = new _(this, observer, cancel);
600             setSink(sink);
601             return sink.Run();
602         }
603 
604         class _ : Sink<TSource>, IObserver<TSource>
605         {
606             private readonly Delay<TSource, TDelay> _parent;
607 
_(Delay<TSource, TDelay> parent, IObserver<TSource> observer, IDisposable cancel)608             public _(Delay<TSource, TDelay> parent, IObserver<TSource> observer, IDisposable cancel)
609                 : base(observer, cancel)
610             {
611                 _parent = parent;
612             }
613 
614             private CompositeDisposable _delays;
615             private object _gate;
616             private bool _atEnd;
617             private SerialDisposable _subscription;
618 
Run()619             public IDisposable Run()
620             {
621                 _delays = new CompositeDisposable();
622                 _gate = new object();
623                 _atEnd = false;
624                 _subscription = new SerialDisposable();
625 
626                 if (_parent._subscriptionDelay == null)
627                 {
628                     Start();
629                 }
630                 else
631                 {
632                     _subscription.Disposable = _parent._subscriptionDelay.SubscribeSafe(new SubscriptionDelay(this));
633                 }
634 
635                 return new CompositeDisposable(_subscription, _delays);
636             }
637 
Start()638             private void Start()
639             {
640                 _subscription.Disposable = _parent._source.SubscribeSafe(this);
641             }
642 
OnNext(TSource value)643             public void OnNext(TSource value)
644             {
645                 var delay = default(IObservable<TDelay>);
646                 try
647                 {
648                     delay = _parent._delaySelector(value);
649                 }
650                 catch (Exception error)
651                 {
652                     lock (_gate)
653                     {
654                         base._observer.OnError(error);
655                         base.Dispose();
656                     }
657 
658                     return;
659                 }
660 
661                 var d = new SingleAssignmentDisposable();
662                 _delays.Add(d);
663                 d.Disposable = delay.SubscribeSafe(new Delta(this, value, d));
664             }
665 
OnError(Exception error)666             public void OnError(Exception error)
667             {
668                 lock (_gate)
669                 {
670                     base._observer.OnError(error);
671                     base.Dispose();
672                 }
673             }
674 
OnCompleted()675             public void OnCompleted()
676             {
677                 lock (_gate)
678                 {
679                     _atEnd = true;
680                     _subscription.Dispose();
681 
682                     CheckDone();
683                 }
684             }
685 
CheckDone()686             private void CheckDone()
687             {
688                 if (_atEnd && _delays.Count == 0)
689                 {
690                     base._observer.OnCompleted();
691                     base.Dispose();
692                 }
693             }
694 
695             class SubscriptionDelay : IObserver<TDelay>
696             {
697                 private readonly _ _parent;
698 
SubscriptionDelay(_ parent)699                 public SubscriptionDelay(_ parent)
700                 {
701                     _parent = parent;
702                 }
703 
OnNext(TDelay value)704                 public void OnNext(TDelay value)
705                 {
706                     _parent.Start();
707                 }
708 
OnError(Exception error)709                 public void OnError(Exception error)
710                 {
711                     _parent._observer.OnError(error);
712                     _parent.Dispose();
713                 }
714 
OnCompleted()715                 public void OnCompleted()
716                 {
717                     _parent.Start();
718                 }
719             }
720 
721             class Delta : IObserver<TDelay>
722             {
723                 private readonly _ _parent;
724                 private readonly TSource _value;
725                 private readonly IDisposable _self;
726 
Delta(_ parent, TSource value, IDisposable self)727                 public Delta(_ parent, TSource value, IDisposable self)
728                 {
729                     _parent = parent;
730                     _value = value;
731                     _self = self;
732                 }
733 
OnNext(TDelay value)734                 public void OnNext(TDelay value)
735                 {
736                     lock (_parent._gate)
737                     {
738                         _parent._observer.OnNext(_value);
739 
740                         _parent._delays.Remove(_self);
741                         _parent.CheckDone();
742                     }
743                 }
744 
OnError(Exception error)745                 public void OnError(Exception error)
746                 {
747                     lock (_parent._gate)
748                     {
749                         _parent._observer.OnError(error);
750                         _parent.Dispose();
751                     }
752                 }
753 
OnCompleted()754                 public void OnCompleted()
755                 {
756                     lock (_parent._gate)
757                     {
758                         _parent._observer.OnNext(_value);
759 
760                         _parent._delays.Remove(_self);
761                         _parent.CheckDone();
762                     }
763                 }
764             }
765         }
766     }
767 }
768 #endif