1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #if !NO_PERF
4 using System;
5 using System.Collections.Generic;
6 using System.Diagnostics;
7 using System.Reactive.Concurrency;
8 using System.Reactive.Disposables;
9 using System.Threading;
10 
11 namespace System.Reactive.Linq.ObservableImpl
12 {
13     class Buffer<TSource> : Producer<IList<TSource>>
14     {
15         private readonly IObservable<TSource> _source;
16         private readonly int _count;
17         private readonly int _skip;
18 
19         private readonly TimeSpan _timeSpan;
20         private readonly TimeSpan _timeShift;
21         private readonly IScheduler _scheduler;
22 
Buffer(IObservable<TSource> source, int count, int skip)23         public Buffer(IObservable<TSource> source, int count, int skip)
24         {
25             _source = source;
26             _count = count;
27             _skip = skip;
28         }
29 
Buffer(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)30         public Buffer(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
31         {
32             _source = source;
33             _timeSpan = timeSpan;
34             _timeShift = timeShift;
35             _scheduler = scheduler;
36         }
37 
Buffer(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)38         public Buffer(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
39         {
40             _source = source;
41             _timeSpan = timeSpan;
42             _count = count;
43             _scheduler = scheduler;
44         }
45 
Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)46         protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
47         {
48             if (_scheduler == null)
49             {
50                 var sink = new _(this, observer, cancel);
51                 setSink(sink);
52                 return sink.Run();
53             }
54             else if (_count > 0)
55             {
56                 var sink = new Impl(this, observer, cancel);
57                 setSink(sink);
58                 return sink.Run();
59             }
60             else
61             {
62                 if (_timeSpan == _timeShift)
63                 {
64                     var sink = new BufferTimeShift(this, observer, cancel);
65                     setSink(sink);
66                     return sink.Run();
67                 }
68                 else
69                 {
70                     var sink = new BufferImpl(this, observer, cancel);
71                     setSink(sink);
72                     return sink.Run();
73                 }
74             }
75         }
76 
77         class _ : Sink<IList<TSource>>, IObserver<TSource>
78         {
79             private readonly Buffer<TSource> _parent;
80 
_(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)81             public _(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
82                 : base(observer, cancel)
83             {
84                 _parent = parent;
85             }
86 
87             private Queue<IList<TSource>> _queue;
88             private int _n;
89 
Run()90             public IDisposable Run()
91             {
92                 _queue = new Queue<IList<TSource>>();
93                 _n = 0;
94 
95                 CreateWindow();
96                 return _parent._source.SubscribeSafe(this);
97             }
98 
CreateWindow()99             private void CreateWindow()
100             {
101                 var s = new List<TSource>();
102                 _queue.Enqueue(s);
103             }
104 
OnNext(TSource value)105             public void OnNext(TSource value)
106             {
107                 foreach (var s in _queue)
108                     s.Add(value);
109 
110                 var c = _n - _parent._count + 1;
111                 if (c >= 0 && c % _parent._skip == 0)
112                 {
113                     var s = _queue.Dequeue();
114                     if (s.Count > 0)
115                         base._observer.OnNext(s);
116                 }
117 
118                 _n++;
119                 if (_n % _parent._skip == 0)
120                     CreateWindow();
121             }
122 
OnError(Exception error)123             public void OnError(Exception error)
124             {
125                 while (_queue.Count > 0)
126                     _queue.Dequeue().Clear();
127 
128                 base._observer.OnError(error);
129                 base.Dispose();
130             }
131 
OnCompleted()132             public void OnCompleted()
133             {
134                 while (_queue.Count > 0)
135                 {
136                     var s = _queue.Dequeue();
137                     if (s.Count > 0)
138                         base._observer.OnNext(s);
139                 }
140 
141                 base._observer.OnCompleted();
142                 base.Dispose();
143             }
144         }
145 
146         class BufferImpl : Sink<IList<TSource>>, IObserver<TSource>
147         {
148             private readonly Buffer<TSource> _parent;
149 
BufferImpl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)150             public BufferImpl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
151                 : base(observer, cancel)
152             {
153                 _parent = parent;
154             }
155 
156             private TimeSpan _totalTime;
157             private TimeSpan _nextShift;
158             private TimeSpan _nextSpan;
159 
160             private object _gate;
161             private Queue<List<TSource>> _q;
162 
163             private SerialDisposable _timerD;
164 
Run()165             public IDisposable Run()
166             {
167                 _totalTime = TimeSpan.Zero;
168                 _nextShift = _parent._timeShift;
169                 _nextSpan = _parent._timeSpan;
170 
171                 _gate = new object();
172                 _q = new Queue<List<TSource>>();
173 
174                 _timerD = new SerialDisposable();
175 
176                 CreateWindow();
177                 CreateTimer();
178 
179                 var subscription = _parent._source.SubscribeSafe(this);
180 
181                 return new CompositeDisposable { _timerD, subscription };
182             }
183 
CreateWindow()184             private void CreateWindow()
185             {
186                 var s = new List<TSource>();
187                 _q.Enqueue(s);
188             }
189 
CreateTimer()190             private void CreateTimer()
191             {
192                 var m = new SingleAssignmentDisposable();
193                 _timerD.Disposable = m;
194 
195                 var isSpan = false;
196                 var isShift = false;
197                 if (_nextSpan == _nextShift)
198                 {
199                     isSpan = true;
200                     isShift = true;
201                 }
202                 else if (_nextSpan < _nextShift)
203                     isSpan = true;
204                 else
205                     isShift = true;
206 
207                 var newTotalTime = isSpan ? _nextSpan : _nextShift;
208                 var ts = newTotalTime - _totalTime;
209                 _totalTime = newTotalTime;
210 
211                 if (isSpan)
212                     _nextSpan += _parent._timeShift;
213                 if (isShift)
214                     _nextShift += _parent._timeShift;
215 
216                 m.Disposable = _parent._scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick);
217             }
218 
219             struct State
220             {
221                 public bool isSpan;
222                 public bool isShift;
223             }
224 
Tick(IScheduler self, State state)225             private IDisposable Tick(IScheduler self, State state)
226             {
227                 lock (_gate)
228                 {
229                     //
230                     // Before v2, the two operations below were reversed. This doesn't have an observable
231                     // difference for Buffer, but is done to keep code consistent with Window, where we
232                     // took a breaking change in v2 to ensure consistency across overloads. For more info,
233                     // see the comment in Tick for Window.
234                     //
235                     if (state.isSpan)
236                     {
237                         var s = _q.Dequeue();
238                         base._observer.OnNext(s);
239                     }
240 
241                     if (state.isShift)
242                     {
243                         CreateWindow();
244                     }
245                 }
246 
247                 CreateTimer();
248 
249                 return Disposable.Empty;
250             }
251 
OnNext(TSource value)252             public void OnNext(TSource value)
253             {
254                 lock (_gate)
255                 {
256                     foreach (var s in _q)
257                         s.Add(value);
258                 }
259             }
260 
OnError(Exception error)261             public void OnError(Exception error)
262             {
263                 lock (_gate)
264                 {
265                     while (_q.Count > 0)
266                         _q.Dequeue().Clear();
267 
268                     base._observer.OnError(error);
269                     base.Dispose();
270                 }
271             }
272 
OnCompleted()273             public void OnCompleted()
274             {
275                 lock (_gate)
276                 {
277                     while (_q.Count > 0)
278                         base._observer.OnNext(_q.Dequeue());
279 
280                     base._observer.OnCompleted();
281                     base.Dispose();
282                 }
283             }
284         }
285 
286         class BufferTimeShift : Sink<IList<TSource>>, IObserver<TSource>
287         {
288             private readonly Buffer<TSource> _parent;
289 
BufferTimeShift(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)290             public BufferTimeShift(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
291                 : base(observer, cancel)
292             {
293                 _parent = parent;
294             }
295 
296             private object _gate;
297             private List<TSource> _list;
298 
Run()299             public IDisposable Run()
300             {
301                 _gate = new object();
302                 _list = new List<TSource>();
303 
304                 var d = _parent._scheduler.SchedulePeriodic(_parent._timeSpan, Tick);
305                 var s = _parent._source.SubscribeSafe(this);
306 
307                 return new CompositeDisposable(d, s);
308             }
309 
Tick()310             private void Tick()
311             {
312                 lock (_gate)
313                 {
314                     base._observer.OnNext(_list);
315                     _list = new List<TSource>();
316                 }
317             }
318 
OnNext(TSource value)319             public void OnNext(TSource value)
320             {
321                 lock (_gate)
322                 {
323                     _list.Add(value);
324                 }
325             }
326 
OnError(Exception error)327             public void OnError(Exception error)
328             {
329                 lock (_gate)
330                 {
331                     _list.Clear();
332 
333                     base._observer.OnError(error);
334                     base.Dispose();
335                 }
336             }
337 
OnCompleted()338             public void OnCompleted()
339             {
340                 lock (_gate)
341                 {
342                     base._observer.OnNext(_list);
343                     base._observer.OnCompleted();
344                     base.Dispose();
345                 }
346             }
347         }
348 
349         class Impl : Sink<IList<TSource>>, IObserver<TSource>
350         {
351             private readonly Buffer<TSource> _parent;
352 
Impl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)353             public Impl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
354                 : base(observer, cancel)
355             {
356                 _parent = parent;
357             }
358 
359             private object _gate;
360             private IList<TSource> _s;
361             private int _n;
362             private int _windowId;
363 
364             private SerialDisposable _timerD;
365 
Run()366             public IDisposable Run()
367             {
368                 _gate = new object();
369                 _s = default(IList<TSource>);
370                 _n = 0;
371                 _windowId = 0;
372 
373                 _timerD = new SerialDisposable();
374 
375                 _s = new List<TSource>();
376                 CreateTimer(0);
377 
378                 var subscription = _parent._source.SubscribeSafe(this);
379 
380                 return new CompositeDisposable { _timerD, subscription };
381             }
382 
CreateTimer(int id)383             private void CreateTimer(int id)
384             {
385                 var m = new SingleAssignmentDisposable();
386                 _timerD.Disposable = m;
387 
388                 m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick);
389             }
390 
Tick(IScheduler self, int id)391             private IDisposable Tick(IScheduler self, int id)
392             {
393                 var d = Disposable.Empty;
394 
395                 var newId = 0;
396                 lock (_gate)
397                 {
398                     if (id != _windowId)
399                         return d;
400 
401                     _n = 0;
402                     newId = ++_windowId;
403 
404                     var res = _s;
405                     _s = new List<TSource>();
406                     base._observer.OnNext(res);
407                 }
408 
409                 CreateTimer(newId);
410 
411                 return d;
412             }
413 
OnNext(TSource value)414             public void OnNext(TSource value)
415             {
416                 var newWindow = false;
417                 var newId = 0;
418 
419                 lock (_gate)
420                 {
421                     _s.Add(value);
422 
423                     _n++;
424                     if (_n == _parent._count)
425                     {
426                         newWindow = true;
427                         _n = 0;
428                         newId = ++_windowId;
429 
430                         var res = _s;
431                         _s = new List<TSource>();
432                         base._observer.OnNext(res);
433                     }
434                 }
435 
436                 if (newWindow)
437                     CreateTimer(newId);
438             }
439 
OnError(Exception error)440             public void OnError(Exception error)
441             {
442                 lock (_gate)
443                 {
444                     _s.Clear();
445                     base._observer.OnError(error);
446                     base.Dispose();
447                 }
448             }
449 
OnCompleted()450             public void OnCompleted()
451             {
452                 lock (_gate)
453                 {
454                     base._observer.OnNext(_s);
455                     base._observer.OnCompleted();
456                     base.Dispose();
457                 }
458             }
459         }
460     }
461 
462     class Buffer<TSource, TBufferClosing> : Producer<IList<TSource>>
463     {
464         private readonly IObservable<TSource> _source;
465         private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector;
466         private readonly IObservable<TBufferClosing> _bufferBoundaries;
467 
Buffer(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector)468         public Buffer(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector)
469         {
470             _source = source;
471             _bufferClosingSelector = bufferClosingSelector;
472         }
473 
Buffer(IObservable<TSource> source, IObservable<TBufferClosing> bufferBoundaries)474         public Buffer(IObservable<TSource> source, IObservable<TBufferClosing> bufferBoundaries)
475         {
476             _source = source;
477             _bufferBoundaries = bufferBoundaries;
478         }
479 
Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)480         protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
481         {
482             if (_bufferClosingSelector != null)
483             {
484                 var sink = new _(this, observer, cancel);
485                 setSink(sink);
486                 return sink.Run();
487             }
488             else
489             {
490                 var sink = new Beta(this, observer, cancel);
491                 setSink(sink);
492                 return sink.Run();
493             }
494         }
495 
496         class _ : Sink<IList<TSource>>, IObserver<TSource>
497         {
498             private readonly Buffer<TSource, TBufferClosing> _parent;
499 
_(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)500             public _(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
501                 : base(observer, cancel)
502             {
503                 _parent = parent;
504             }
505 
506             private IList<TSource> _buffer;
507             private object _gate;
508             private AsyncLock _bufferGate;
509 
510             private SerialDisposable _m;
511 
Run()512             public IDisposable Run()
513             {
514                 _buffer = new List<TSource>();
515                 _gate = new object();
516                 _bufferGate = new AsyncLock();
517 
518                 _m = new SerialDisposable();
519                 var groupDisposable = new CompositeDisposable(2) { _m };
520 
521                 groupDisposable.Add(_parent._source.SubscribeSafe(this));
522 
523                 _bufferGate.Wait(CreateBufferClose);
524 
525                 return groupDisposable;
526             }
527 
CreateBufferClose()528             private void CreateBufferClose()
529             {
530                 var bufferClose = default(IObservable<TBufferClosing>);
531                 try
532                 {
533                     bufferClose = _parent._bufferClosingSelector();
534                 }
535                 catch (Exception exception)
536                 {
537                     lock (_gate)
538                     {
539                         base._observer.OnError(exception);
540                         base.Dispose();
541                     }
542                     return;
543                 }
544 
545                 var closingSubscription = new SingleAssignmentDisposable();
546                 _m.Disposable = closingSubscription;
547                 closingSubscription.Disposable = bufferClose.SubscribeSafe(new Omega(this, closingSubscription));
548             }
549 
CloseBuffer(IDisposable closingSubscription)550             private void CloseBuffer(IDisposable closingSubscription)
551             {
552                 closingSubscription.Dispose();
553 
554                 lock (_gate)
555                 {
556                     var res = _buffer;
557                     _buffer = new List<TSource>();
558                     base._observer.OnNext(res);
559                 }
560 
561                 _bufferGate.Wait(CreateBufferClose);
562             }
563 
564             class Omega : IObserver<TBufferClosing>
565             {
566                 private readonly _ _parent;
567                 private readonly IDisposable _self;
568 
Omega(_ parent, IDisposable self)569                 public Omega(_ parent, IDisposable self)
570                 {
571                     _parent = parent;
572                     _self = self;
573                 }
574 
OnNext(TBufferClosing value)575                 public void OnNext(TBufferClosing value)
576                 {
577                     _parent.CloseBuffer(_self);
578                 }
579 
OnError(Exception error)580                 public void OnError(Exception error)
581                 {
582                     _parent.OnError(error);
583                 }
584 
OnCompleted()585                 public void OnCompleted()
586                 {
587                     _parent.CloseBuffer(_self);
588                 }
589             }
590 
OnNext(TSource value)591             public void OnNext(TSource value)
592             {
593                 lock (_gate)
594                 {
595                     _buffer.Add(value);
596                 }
597             }
598 
OnError(Exception error)599             public void OnError(Exception error)
600             {
601                 lock (_gate)
602                 {
603                     _buffer.Clear();
604                     base._observer.OnError(error);
605                     base.Dispose();
606                 }
607             }
608 
OnCompleted()609             public void OnCompleted()
610             {
611                 lock (_gate)
612                 {
613                     base._observer.OnNext(_buffer);
614                     base._observer.OnCompleted();
615                     base.Dispose();
616                 }
617             }
618         }
619 
620         class Beta : Sink<IList<TSource>>, IObserver<TSource>
621         {
622             private readonly Buffer<TSource, TBufferClosing> _parent;
623 
Beta(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)624             public Beta(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
625                 : base(observer, cancel)
626             {
627                 _parent = parent;
628             }
629 
630             private IList<TSource> _buffer;
631             private object _gate;
632 
633             private RefCountDisposable _refCountDisposable;
634 
Run()635             public IDisposable Run()
636             {
637                 _buffer = new List<TSource>();
638                 _gate = new object();
639 
640                 var d = new CompositeDisposable(2);
641                 _refCountDisposable = new RefCountDisposable(d);
642 
643                 d.Add(_parent._source.SubscribeSafe(this));
644                 d.Add(_parent._bufferBoundaries.SubscribeSafe(new Omega(this)));
645 
646                 return _refCountDisposable;
647             }
648 
649             class Omega : IObserver<TBufferClosing>
650             {
651                 private readonly Beta _parent;
652 
Omega(Beta parent)653                 public Omega(Beta parent)
654                 {
655                     _parent = parent;
656                 }
657 
OnNext(TBufferClosing value)658                 public void OnNext(TBufferClosing value)
659                 {
660                     lock (_parent._gate)
661                     {
662                         var res = _parent._buffer;
663                         _parent._buffer = new List<TSource>();
664                         _parent._observer.OnNext(res);
665                     }
666                 }
667 
OnError(Exception error)668                 public void OnError(Exception error)
669                 {
670                     _parent.OnError(error);
671                 }
672 
OnCompleted()673                 public void OnCompleted()
674                 {
675                     _parent.OnCompleted();
676                 }
677             }
678 
OnNext(TSource value)679             public void OnNext(TSource value)
680             {
681                 lock (_gate)
682                 {
683                     _buffer.Add(value);
684                 }
685             }
686 
OnError(Exception error)687             public void OnError(Exception error)
688             {
689                 lock (_gate)
690                 {
691                     _buffer.Clear();
692                     base._observer.OnError(error);
693                     base.Dispose();
694                 }
695             }
696 
OnCompleted()697             public void OnCompleted()
698             {
699                 lock (_gate)
700                 {
701                     base._observer.OnNext(_buffer);
702                     base._observer.OnCompleted();
703                     base.Dispose();
704                 }
705             }
706         }
707     }
708 }
709 #endif