1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #if !NO_PERF 4 using System; 5 using System.Collections.Generic; 6 using System.Diagnostics; 7 using System.Reactive.Concurrency; 8 using System.Reactive.Disposables; 9 using System.Threading; 10 11 namespace System.Reactive.Linq.ObservableImpl 12 { 13 class Buffer<TSource> : Producer<IList<TSource>> 14 { 15 private readonly IObservable<TSource> _source; 16 private readonly int _count; 17 private readonly int _skip; 18 19 private readonly TimeSpan _timeSpan; 20 private readonly TimeSpan _timeShift; 21 private readonly IScheduler _scheduler; 22 Buffer(IObservable<TSource> source, int count, int skip)23 public Buffer(IObservable<TSource> source, int count, int skip) 24 { 25 _source = source; 26 _count = count; 27 _skip = skip; 28 } 29 Buffer(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)30 public Buffer(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) 31 { 32 _source = source; 33 _timeSpan = timeSpan; 34 _timeShift = timeShift; 35 _scheduler = scheduler; 36 } 37 Buffer(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)38 public Buffer(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler) 39 { 40 _source = source; 41 _timeSpan = timeSpan; 42 _count = count; 43 _scheduler = scheduler; 44 } 45 Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)46 protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink) 47 { 48 if (_scheduler == null) 49 { 50 var sink = new _(this, observer, cancel); 51 setSink(sink); 52 return sink.Run(); 53 } 54 else if (_count > 0) 55 { 56 var sink = new Impl(this, observer, cancel); 57 setSink(sink); 58 return sink.Run(); 59 } 60 else 61 { 62 if (_timeSpan == _timeShift) 63 { 64 var sink = new BufferTimeShift(this, observer, cancel); 65 setSink(sink); 66 return sink.Run(); 67 } 68 else 69 { 70 var sink = new BufferImpl(this, observer, cancel); 71 setSink(sink); 72 return sink.Run(); 73 } 74 } 75 } 76 77 class _ : Sink<IList<TSource>>, IObserver<TSource> 78 { 79 private readonly Buffer<TSource> _parent; 80 _(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)81 public _(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel) 82 : base(observer, cancel) 83 { 84 _parent = parent; 85 } 86 87 private Queue<IList<TSource>> _queue; 88 private int _n; 89 Run()90 public IDisposable Run() 91 { 92 _queue = new Queue<IList<TSource>>(); 93 _n = 0; 94 95 CreateWindow(); 96 return _parent._source.SubscribeSafe(this); 97 } 98 CreateWindow()99 private void CreateWindow() 100 { 101 var s = new List<TSource>(); 102 _queue.Enqueue(s); 103 } 104 OnNext(TSource value)105 public void OnNext(TSource value) 106 { 107 foreach (var s in _queue) 108 s.Add(value); 109 110 var c = _n - _parent._count + 1; 111 if (c >= 0 && c % _parent._skip == 0) 112 { 113 var s = _queue.Dequeue(); 114 if (s.Count > 0) 115 base._observer.OnNext(s); 116 } 117 118 _n++; 119 if (_n % _parent._skip == 0) 120 CreateWindow(); 121 } 122 OnError(Exception error)123 public void OnError(Exception error) 124 { 125 while (_queue.Count > 0) 126 _queue.Dequeue().Clear(); 127 128 base._observer.OnError(error); 129 base.Dispose(); 130 } 131 OnCompleted()132 public void OnCompleted() 133 { 134 while (_queue.Count > 0) 135 { 136 var s = _queue.Dequeue(); 137 if (s.Count > 0) 138 base._observer.OnNext(s); 139 } 140 141 base._observer.OnCompleted(); 142 base.Dispose(); 143 } 144 } 145 146 class BufferImpl : Sink<IList<TSource>>, IObserver<TSource> 147 { 148 private readonly Buffer<TSource> _parent; 149 BufferImpl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)150 public BufferImpl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel) 151 : base(observer, cancel) 152 { 153 _parent = parent; 154 } 155 156 private TimeSpan _totalTime; 157 private TimeSpan _nextShift; 158 private TimeSpan _nextSpan; 159 160 private object _gate; 161 private Queue<List<TSource>> _q; 162 163 private SerialDisposable _timerD; 164 Run()165 public IDisposable Run() 166 { 167 _totalTime = TimeSpan.Zero; 168 _nextShift = _parent._timeShift; 169 _nextSpan = _parent._timeSpan; 170 171 _gate = new object(); 172 _q = new Queue<List<TSource>>(); 173 174 _timerD = new SerialDisposable(); 175 176 CreateWindow(); 177 CreateTimer(); 178 179 var subscription = _parent._source.SubscribeSafe(this); 180 181 return new CompositeDisposable { _timerD, subscription }; 182 } 183 CreateWindow()184 private void CreateWindow() 185 { 186 var s = new List<TSource>(); 187 _q.Enqueue(s); 188 } 189 CreateTimer()190 private void CreateTimer() 191 { 192 var m = new SingleAssignmentDisposable(); 193 _timerD.Disposable = m; 194 195 var isSpan = false; 196 var isShift = false; 197 if (_nextSpan == _nextShift) 198 { 199 isSpan = true; 200 isShift = true; 201 } 202 else if (_nextSpan < _nextShift) 203 isSpan = true; 204 else 205 isShift = true; 206 207 var newTotalTime = isSpan ? _nextSpan : _nextShift; 208 var ts = newTotalTime - _totalTime; 209 _totalTime = newTotalTime; 210 211 if (isSpan) 212 _nextSpan += _parent._timeShift; 213 if (isShift) 214 _nextShift += _parent._timeShift; 215 216 m.Disposable = _parent._scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick); 217 } 218 219 struct State 220 { 221 public bool isSpan; 222 public bool isShift; 223 } 224 Tick(IScheduler self, State state)225 private IDisposable Tick(IScheduler self, State state) 226 { 227 lock (_gate) 228 { 229 // 230 // Before v2, the two operations below were reversed. This doesn't have an observable 231 // difference for Buffer, but is done to keep code consistent with Window, where we 232 // took a breaking change in v2 to ensure consistency across overloads. For more info, 233 // see the comment in Tick for Window. 234 // 235 if (state.isSpan) 236 { 237 var s = _q.Dequeue(); 238 base._observer.OnNext(s); 239 } 240 241 if (state.isShift) 242 { 243 CreateWindow(); 244 } 245 } 246 247 CreateTimer(); 248 249 return Disposable.Empty; 250 } 251 OnNext(TSource value)252 public void OnNext(TSource value) 253 { 254 lock (_gate) 255 { 256 foreach (var s in _q) 257 s.Add(value); 258 } 259 } 260 OnError(Exception error)261 public void OnError(Exception error) 262 { 263 lock (_gate) 264 { 265 while (_q.Count > 0) 266 _q.Dequeue().Clear(); 267 268 base._observer.OnError(error); 269 base.Dispose(); 270 } 271 } 272 OnCompleted()273 public void OnCompleted() 274 { 275 lock (_gate) 276 { 277 while (_q.Count > 0) 278 base._observer.OnNext(_q.Dequeue()); 279 280 base._observer.OnCompleted(); 281 base.Dispose(); 282 } 283 } 284 } 285 286 class BufferTimeShift : Sink<IList<TSource>>, IObserver<TSource> 287 { 288 private readonly Buffer<TSource> _parent; 289 BufferTimeShift(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)290 public BufferTimeShift(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel) 291 : base(observer, cancel) 292 { 293 _parent = parent; 294 } 295 296 private object _gate; 297 private List<TSource> _list; 298 Run()299 public IDisposable Run() 300 { 301 _gate = new object(); 302 _list = new List<TSource>(); 303 304 var d = _parent._scheduler.SchedulePeriodic(_parent._timeSpan, Tick); 305 var s = _parent._source.SubscribeSafe(this); 306 307 return new CompositeDisposable(d, s); 308 } 309 Tick()310 private void Tick() 311 { 312 lock (_gate) 313 { 314 base._observer.OnNext(_list); 315 _list = new List<TSource>(); 316 } 317 } 318 OnNext(TSource value)319 public void OnNext(TSource value) 320 { 321 lock (_gate) 322 { 323 _list.Add(value); 324 } 325 } 326 OnError(Exception error)327 public void OnError(Exception error) 328 { 329 lock (_gate) 330 { 331 _list.Clear(); 332 333 base._observer.OnError(error); 334 base.Dispose(); 335 } 336 } 337 OnCompleted()338 public void OnCompleted() 339 { 340 lock (_gate) 341 { 342 base._observer.OnNext(_list); 343 base._observer.OnCompleted(); 344 base.Dispose(); 345 } 346 } 347 } 348 349 class Impl : Sink<IList<TSource>>, IObserver<TSource> 350 { 351 private readonly Buffer<TSource> _parent; 352 Impl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)353 public Impl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel) 354 : base(observer, cancel) 355 { 356 _parent = parent; 357 } 358 359 private object _gate; 360 private IList<TSource> _s; 361 private int _n; 362 private int _windowId; 363 364 private SerialDisposable _timerD; 365 Run()366 public IDisposable Run() 367 { 368 _gate = new object(); 369 _s = default(IList<TSource>); 370 _n = 0; 371 _windowId = 0; 372 373 _timerD = new SerialDisposable(); 374 375 _s = new List<TSource>(); 376 CreateTimer(0); 377 378 var subscription = _parent._source.SubscribeSafe(this); 379 380 return new CompositeDisposable { _timerD, subscription }; 381 } 382 CreateTimer(int id)383 private void CreateTimer(int id) 384 { 385 var m = new SingleAssignmentDisposable(); 386 _timerD.Disposable = m; 387 388 m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick); 389 } 390 Tick(IScheduler self, int id)391 private IDisposable Tick(IScheduler self, int id) 392 { 393 var d = Disposable.Empty; 394 395 var newId = 0; 396 lock (_gate) 397 { 398 if (id != _windowId) 399 return d; 400 401 _n = 0; 402 newId = ++_windowId; 403 404 var res = _s; 405 _s = new List<TSource>(); 406 base._observer.OnNext(res); 407 } 408 409 CreateTimer(newId); 410 411 return d; 412 } 413 OnNext(TSource value)414 public void OnNext(TSource value) 415 { 416 var newWindow = false; 417 var newId = 0; 418 419 lock (_gate) 420 { 421 _s.Add(value); 422 423 _n++; 424 if (_n == _parent._count) 425 { 426 newWindow = true; 427 _n = 0; 428 newId = ++_windowId; 429 430 var res = _s; 431 _s = new List<TSource>(); 432 base._observer.OnNext(res); 433 } 434 } 435 436 if (newWindow) 437 CreateTimer(newId); 438 } 439 OnError(Exception error)440 public void OnError(Exception error) 441 { 442 lock (_gate) 443 { 444 _s.Clear(); 445 base._observer.OnError(error); 446 base.Dispose(); 447 } 448 } 449 OnCompleted()450 public void OnCompleted() 451 { 452 lock (_gate) 453 { 454 base._observer.OnNext(_s); 455 base._observer.OnCompleted(); 456 base.Dispose(); 457 } 458 } 459 } 460 } 461 462 class Buffer<TSource, TBufferClosing> : Producer<IList<TSource>> 463 { 464 private readonly IObservable<TSource> _source; 465 private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector; 466 private readonly IObservable<TBufferClosing> _bufferBoundaries; 467 Buffer(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector)468 public Buffer(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector) 469 { 470 _source = source; 471 _bufferClosingSelector = bufferClosingSelector; 472 } 473 Buffer(IObservable<TSource> source, IObservable<TBufferClosing> bufferBoundaries)474 public Buffer(IObservable<TSource> source, IObservable<TBufferClosing> bufferBoundaries) 475 { 476 _source = source; 477 _bufferBoundaries = bufferBoundaries; 478 } 479 Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)480 protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink) 481 { 482 if (_bufferClosingSelector != null) 483 { 484 var sink = new _(this, observer, cancel); 485 setSink(sink); 486 return sink.Run(); 487 } 488 else 489 { 490 var sink = new Beta(this, observer, cancel); 491 setSink(sink); 492 return sink.Run(); 493 } 494 } 495 496 class _ : Sink<IList<TSource>>, IObserver<TSource> 497 { 498 private readonly Buffer<TSource, TBufferClosing> _parent; 499 _(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)500 public _(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel) 501 : base(observer, cancel) 502 { 503 _parent = parent; 504 } 505 506 private IList<TSource> _buffer; 507 private object _gate; 508 private AsyncLock _bufferGate; 509 510 private SerialDisposable _m; 511 Run()512 public IDisposable Run() 513 { 514 _buffer = new List<TSource>(); 515 _gate = new object(); 516 _bufferGate = new AsyncLock(); 517 518 _m = new SerialDisposable(); 519 var groupDisposable = new CompositeDisposable(2) { _m }; 520 521 groupDisposable.Add(_parent._source.SubscribeSafe(this)); 522 523 _bufferGate.Wait(CreateBufferClose); 524 525 return groupDisposable; 526 } 527 CreateBufferClose()528 private void CreateBufferClose() 529 { 530 var bufferClose = default(IObservable<TBufferClosing>); 531 try 532 { 533 bufferClose = _parent._bufferClosingSelector(); 534 } 535 catch (Exception exception) 536 { 537 lock (_gate) 538 { 539 base._observer.OnError(exception); 540 base.Dispose(); 541 } 542 return; 543 } 544 545 var closingSubscription = new SingleAssignmentDisposable(); 546 _m.Disposable = closingSubscription; 547 closingSubscription.Disposable = bufferClose.SubscribeSafe(new Omega(this, closingSubscription)); 548 } 549 CloseBuffer(IDisposable closingSubscription)550 private void CloseBuffer(IDisposable closingSubscription) 551 { 552 closingSubscription.Dispose(); 553 554 lock (_gate) 555 { 556 var res = _buffer; 557 _buffer = new List<TSource>(); 558 base._observer.OnNext(res); 559 } 560 561 _bufferGate.Wait(CreateBufferClose); 562 } 563 564 class Omega : IObserver<TBufferClosing> 565 { 566 private readonly _ _parent; 567 private readonly IDisposable _self; 568 Omega(_ parent, IDisposable self)569 public Omega(_ parent, IDisposable self) 570 { 571 _parent = parent; 572 _self = self; 573 } 574 OnNext(TBufferClosing value)575 public void OnNext(TBufferClosing value) 576 { 577 _parent.CloseBuffer(_self); 578 } 579 OnError(Exception error)580 public void OnError(Exception error) 581 { 582 _parent.OnError(error); 583 } 584 OnCompleted()585 public void OnCompleted() 586 { 587 _parent.CloseBuffer(_self); 588 } 589 } 590 OnNext(TSource value)591 public void OnNext(TSource value) 592 { 593 lock (_gate) 594 { 595 _buffer.Add(value); 596 } 597 } 598 OnError(Exception error)599 public void OnError(Exception error) 600 { 601 lock (_gate) 602 { 603 _buffer.Clear(); 604 base._observer.OnError(error); 605 base.Dispose(); 606 } 607 } 608 OnCompleted()609 public void OnCompleted() 610 { 611 lock (_gate) 612 { 613 base._observer.OnNext(_buffer); 614 base._observer.OnCompleted(); 615 base.Dispose(); 616 } 617 } 618 } 619 620 class Beta : Sink<IList<TSource>>, IObserver<TSource> 621 { 622 private readonly Buffer<TSource, TBufferClosing> _parent; 623 Beta(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)624 public Beta(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel) 625 : base(observer, cancel) 626 { 627 _parent = parent; 628 } 629 630 private IList<TSource> _buffer; 631 private object _gate; 632 633 private RefCountDisposable _refCountDisposable; 634 Run()635 public IDisposable Run() 636 { 637 _buffer = new List<TSource>(); 638 _gate = new object(); 639 640 var d = new CompositeDisposable(2); 641 _refCountDisposable = new RefCountDisposable(d); 642 643 d.Add(_parent._source.SubscribeSafe(this)); 644 d.Add(_parent._bufferBoundaries.SubscribeSafe(new Omega(this))); 645 646 return _refCountDisposable; 647 } 648 649 class Omega : IObserver<TBufferClosing> 650 { 651 private readonly Beta _parent; 652 Omega(Beta parent)653 public Omega(Beta parent) 654 { 655 _parent = parent; 656 } 657 OnNext(TBufferClosing value)658 public void OnNext(TBufferClosing value) 659 { 660 lock (_parent._gate) 661 { 662 var res = _parent._buffer; 663 _parent._buffer = new List<TSource>(); 664 _parent._observer.OnNext(res); 665 } 666 } 667 OnError(Exception error)668 public void OnError(Exception error) 669 { 670 _parent.OnError(error); 671 } 672 OnCompleted()673 public void OnCompleted() 674 { 675 _parent.OnCompleted(); 676 } 677 } 678 OnNext(TSource value)679 public void OnNext(TSource value) 680 { 681 lock (_gate) 682 { 683 _buffer.Add(value); 684 } 685 } 686 OnError(Exception error)687 public void OnError(Exception error) 688 { 689 lock (_gate) 690 { 691 _buffer.Clear(); 692 base._observer.OnError(error); 693 base.Dispose(); 694 } 695 } 696 OnCompleted()697 public void OnCompleted() 698 { 699 lock (_gate) 700 { 701 base._observer.OnNext(_buffer); 702 base._observer.OnCompleted(); 703 base.Dispose(); 704 } 705 } 706 } 707 } 708 } 709 #endif