1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #if !NO_PERF 4 using System; 5 using System.Collections.Generic; 6 7 namespace System.Reactive.Linq.ObservableImpl 8 { 9 class Max<TSource> : Producer<TSource> 10 { 11 private readonly IObservable<TSource> _source; 12 private readonly IComparer<TSource> _comparer; 13 Max(IObservable<TSource> source, IComparer<TSource> comparer)14 public Max(IObservable<TSource> source, IComparer<TSource> comparer) 15 { 16 _source = source; 17 _comparer = comparer; 18 } 19 Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)20 protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink) 21 { 22 // LINQ to Objects makes this distinction in order to make [Max|Max] of an empty collection of reference type objects equal to null. 23 if (default(TSource) == null) 24 { 25 var sink = new _(this, observer, cancel); 26 setSink(sink); 27 return _source.SubscribeSafe(sink); 28 } 29 else 30 { 31 var sink = new Delta(this, observer, cancel); 32 setSink(sink); 33 return _source.SubscribeSafe(sink); 34 } 35 } 36 37 class Delta : Sink<TSource>, IObserver<TSource> 38 { 39 private readonly Max<TSource> _parent; 40 private bool _hasValue; 41 private TSource _lastValue; 42 Delta(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel)43 public Delta(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel) 44 : base(observer, cancel) 45 { 46 _parent = parent; 47 48 _hasValue = false; 49 _lastValue = default(TSource); 50 } 51 OnNext(TSource value)52 public void OnNext(TSource value) 53 { 54 if (_hasValue) 55 { 56 var comparison = 0; 57 58 try 59 { 60 comparison = _parent._comparer.Compare(value, _lastValue); 61 } 62 catch (Exception ex) 63 { 64 base._observer.OnError(ex); 65 base.Dispose(); 66 return; 67 } 68 69 if (comparison > 0) 70 { 71 _lastValue = value; 72 } 73 } 74 else 75 { 76 _hasValue = true; 77 _lastValue = value; 78 } 79 } 80 OnError(Exception error)81 public void OnError(Exception error) 82 { 83 base._observer.OnError(error); 84 base.Dispose(); 85 } 86 OnCompleted()87 public void OnCompleted() 88 { 89 if (!_hasValue) 90 { 91 base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); 92 } 93 else 94 { 95 base._observer.OnNext(_lastValue); 96 base._observer.OnCompleted(); 97 } 98 99 base.Dispose(); 100 } 101 } 102 103 class _ : Sink<TSource>, IObserver<TSource> 104 { 105 private readonly Max<TSource> _parent; 106 private TSource _lastValue; 107 _(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel)108 public _(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel) 109 : base(observer, cancel) 110 { 111 _parent = parent; 112 113 _lastValue = default(TSource); 114 } 115 OnNext(TSource value)116 public void OnNext(TSource value) 117 { 118 if (value != null) 119 { 120 if (_lastValue == null) 121 { 122 _lastValue = value; 123 } 124 else 125 { 126 var comparison = 0; 127 128 try 129 { 130 comparison = _parent._comparer.Compare(value, _lastValue); 131 } 132 catch (Exception ex) 133 { 134 base._observer.OnError(ex); 135 base.Dispose(); 136 return; 137 } 138 139 if (comparison > 0) 140 { 141 _lastValue = value; 142 } 143 } 144 } 145 } 146 OnError(Exception error)147 public void OnError(Exception error) 148 { 149 base._observer.OnError(error); 150 base.Dispose(); 151 } 152 OnCompleted()153 public void OnCompleted() 154 { 155 base._observer.OnNext(_lastValue); 156 base._observer.OnCompleted(); 157 base.Dispose(); 158 } 159 } 160 } 161 162 class MaxDouble : Producer<double> 163 { 164 private readonly IObservable<double> _source; 165 MaxDouble(IObservable<double> source)166 public MaxDouble(IObservable<double> source) 167 { 168 _source = source; 169 } 170 Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)171 protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink) 172 { 173 var sink = new _(observer, cancel); 174 setSink(sink); 175 return _source.SubscribeSafe(sink); 176 } 177 178 class _ : Sink<double>, IObserver<double> 179 { 180 private bool _hasValue; 181 private double _lastValue; 182 _(IObserver<double> observer, IDisposable cancel)183 public _(IObserver<double> observer, IDisposable cancel) 184 : base(observer, cancel) 185 { 186 _hasValue = false; 187 _lastValue = default(double); 188 } 189 OnNext(double value)190 public void OnNext(double value) 191 { 192 if (_hasValue) 193 { 194 if (value > _lastValue || double.IsNaN(value)) 195 { 196 _lastValue = value; 197 } 198 } 199 else 200 { 201 _lastValue = value; 202 _hasValue = true; 203 } 204 } 205 OnError(Exception error)206 public void OnError(Exception error) 207 { 208 base._observer.OnError(error); 209 base.Dispose(); 210 } 211 OnCompleted()212 public void OnCompleted() 213 { 214 if (!_hasValue) 215 { 216 base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); 217 } 218 else 219 { 220 base._observer.OnNext(_lastValue); 221 base._observer.OnCompleted(); 222 } 223 224 base.Dispose(); 225 } 226 } 227 } 228 229 class MaxSingle : Producer<float> 230 { 231 private readonly IObservable<float> _source; 232 MaxSingle(IObservable<float> source)233 public MaxSingle(IObservable<float> source) 234 { 235 _source = source; 236 } 237 Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)238 protected override IDisposable Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink) 239 { 240 var sink = new _(observer, cancel); 241 setSink(sink); 242 return _source.SubscribeSafe(sink); 243 } 244 245 class _ : Sink<float>, IObserver<float> 246 { 247 private bool _hasValue; 248 private float _lastValue; 249 _(IObserver<float> observer, IDisposable cancel)250 public _(IObserver<float> observer, IDisposable cancel) 251 : base(observer, cancel) 252 { 253 _hasValue = false; 254 _lastValue = default(float); 255 } 256 OnNext(float value)257 public void OnNext(float value) 258 { 259 if (_hasValue) 260 { 261 if (value > _lastValue || float.IsNaN(value)) 262 { 263 _lastValue = value; 264 } 265 } 266 else 267 { 268 _lastValue = value; 269 _hasValue = true; 270 } 271 } 272 OnError(Exception error)273 public void OnError(Exception error) 274 { 275 base._observer.OnError(error); 276 base.Dispose(); 277 } 278 OnCompleted()279 public void OnCompleted() 280 { 281 if (!_hasValue) 282 { 283 base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); 284 } 285 else 286 { 287 base._observer.OnNext(_lastValue); 288 base._observer.OnCompleted(); 289 } 290 291 base.Dispose(); 292 } 293 } 294 } 295 296 class MaxDecimal : Producer<decimal> 297 { 298 private readonly IObservable<decimal> _source; 299 MaxDecimal(IObservable<decimal> source)300 public MaxDecimal(IObservable<decimal> source) 301 { 302 _source = source; 303 } 304 Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)305 protected override IDisposable Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink) 306 { 307 var sink = new _(observer, cancel); 308 setSink(sink); 309 return _source.SubscribeSafe(sink); 310 } 311 312 class _ : Sink<decimal>, IObserver<decimal> 313 { 314 private bool _hasValue; 315 private decimal _lastValue; 316 _(IObserver<decimal> observer, IDisposable cancel)317 public _(IObserver<decimal> observer, IDisposable cancel) 318 : base(observer, cancel) 319 { 320 _hasValue = false; 321 _lastValue = default(decimal); 322 } 323 OnNext(decimal value)324 public void OnNext(decimal value) 325 { 326 if (_hasValue) 327 { 328 if (value > _lastValue) 329 { 330 _lastValue = value; 331 } 332 } 333 else 334 { 335 _lastValue = value; 336 _hasValue = true; 337 } 338 } 339 OnError(Exception error)340 public void OnError(Exception error) 341 { 342 base._observer.OnError(error); 343 base.Dispose(); 344 } 345 OnCompleted()346 public void OnCompleted() 347 { 348 if (!_hasValue) 349 { 350 base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); 351 } 352 else 353 { 354 base._observer.OnNext(_lastValue); 355 base._observer.OnCompleted(); 356 } 357 358 base.Dispose(); 359 } 360 } 361 } 362 363 class MaxInt32 : Producer<int> 364 { 365 private readonly IObservable<int> _source; 366 MaxInt32(IObservable<int> source)367 public MaxInt32(IObservable<int> source) 368 { 369 _source = source; 370 } 371 Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)372 protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink) 373 { 374 var sink = new _(observer, cancel); 375 setSink(sink); 376 return _source.SubscribeSafe(sink); 377 } 378 379 class _ : Sink<int>, IObserver<int> 380 { 381 private bool _hasValue; 382 private int _lastValue; 383 _(IObserver<int> observer, IDisposable cancel)384 public _(IObserver<int> observer, IDisposable cancel) 385 : base(observer, cancel) 386 { 387 _hasValue = false; 388 _lastValue = default(int); 389 } 390 OnNext(int value)391 public void OnNext(int value) 392 { 393 if (_hasValue) 394 { 395 if (value > _lastValue) 396 { 397 _lastValue = value; 398 } 399 } 400 else 401 { 402 _lastValue = value; 403 _hasValue = true; 404 } 405 } 406 OnError(Exception error)407 public void OnError(Exception error) 408 { 409 base._observer.OnError(error); 410 base.Dispose(); 411 } 412 OnCompleted()413 public void OnCompleted() 414 { 415 if (!_hasValue) 416 { 417 base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); 418 } 419 else 420 { 421 base._observer.OnNext(_lastValue); 422 base._observer.OnCompleted(); 423 } 424 425 base.Dispose(); 426 } 427 } 428 } 429 430 class MaxInt64 : Producer<long> 431 { 432 private readonly IObservable<long> _source; 433 MaxInt64(IObservable<long> source)434 public MaxInt64(IObservable<long> source) 435 { 436 _source = source; 437 } 438 Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)439 protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink) 440 { 441 var sink = new _(observer, cancel); 442 setSink(sink); 443 return _source.SubscribeSafe(sink); 444 } 445 446 class _ : Sink<long>, IObserver<long> 447 { 448 private bool _hasValue; 449 private long _lastValue; 450 _(IObserver<long> observer, IDisposable cancel)451 public _(IObserver<long> observer, IDisposable cancel) 452 : base(observer, cancel) 453 { 454 _hasValue = false; 455 _lastValue = default(long); 456 } 457 OnNext(long value)458 public void OnNext(long value) 459 { 460 if (_hasValue) 461 { 462 if (value > _lastValue) 463 { 464 _lastValue = value; 465 } 466 } 467 else 468 { 469 _lastValue = value; 470 _hasValue = true; 471 } 472 } 473 OnError(Exception error)474 public void OnError(Exception error) 475 { 476 base._observer.OnError(error); 477 base.Dispose(); 478 } 479 OnCompleted()480 public void OnCompleted() 481 { 482 if (!_hasValue) 483 { 484 base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); 485 } 486 else 487 { 488 base._observer.OnNext(_lastValue); 489 base._observer.OnCompleted(); 490 } 491 492 base.Dispose(); 493 } 494 } 495 } 496 497 class MaxDoubleNullable : Producer<double?> 498 { 499 private readonly IObservable<double?> _source; 500 MaxDoubleNullable(IObservable<double?> source)501 public MaxDoubleNullable(IObservable<double?> source) 502 { 503 _source = source; 504 } 505 Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)506 protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink) 507 { 508 var sink = new _(observer, cancel); 509 setSink(sink); 510 return _source.SubscribeSafe(sink); 511 } 512 513 class _ : Sink<double?>, IObserver<double?> 514 { 515 private double? _lastValue; 516 _(IObserver<double?> observer, IDisposable cancel)517 public _(IObserver<double?> observer, IDisposable cancel) 518 : base(observer, cancel) 519 { 520 _lastValue = default(double?); 521 } 522 OnNext(double? value)523 public void OnNext(double? value) 524 { 525 if (!value.HasValue) 526 return; 527 528 if (_lastValue.HasValue) 529 { 530 if (value > _lastValue || double.IsNaN((double)value)) 531 { 532 _lastValue = value; 533 } 534 } 535 else 536 { 537 _lastValue = value; 538 } 539 } 540 OnError(Exception error)541 public void OnError(Exception error) 542 { 543 base._observer.OnError(error); 544 base.Dispose(); 545 } 546 OnCompleted()547 public void OnCompleted() 548 { 549 base._observer.OnNext(_lastValue); 550 base._observer.OnCompleted(); 551 base.Dispose(); 552 } 553 } 554 } 555 556 class MaxSingleNullable : Producer<float?> 557 { 558 private readonly IObservable<float?> _source; 559 MaxSingleNullable(IObservable<float?> source)560 public MaxSingleNullable(IObservable<float?> source) 561 { 562 _source = source; 563 } 564 Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)565 protected override IDisposable Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink) 566 { 567 var sink = new _(observer, cancel); 568 setSink(sink); 569 return _source.SubscribeSafe(sink); 570 } 571 572 class _ : Sink<float?>, IObserver<float?> 573 { 574 private float? _lastValue; 575 _(IObserver<float?> observer, IDisposable cancel)576 public _(IObserver<float?> observer, IDisposable cancel) 577 : base(observer, cancel) 578 { 579 _lastValue = default(float?); 580 } 581 OnNext(float? value)582 public void OnNext(float? value) 583 { 584 if (!value.HasValue) 585 return; 586 587 if (_lastValue.HasValue) 588 { 589 if (value > _lastValue || float.IsNaN((float)value)) 590 { 591 _lastValue = value; 592 } 593 } 594 else 595 { 596 _lastValue = value; 597 } 598 } 599 OnError(Exception error)600 public void OnError(Exception error) 601 { 602 base._observer.OnError(error); 603 base.Dispose(); 604 } 605 OnCompleted()606 public void OnCompleted() 607 { 608 base._observer.OnNext(_lastValue); 609 base._observer.OnCompleted(); 610 base.Dispose(); 611 } 612 } 613 } 614 615 class MaxDecimalNullable : Producer<decimal?> 616 { 617 private readonly IObservable<decimal?> _source; 618 MaxDecimalNullable(IObservable<decimal?> source)619 public MaxDecimalNullable(IObservable<decimal?> source) 620 { 621 _source = source; 622 } 623 Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)624 protected override IDisposable Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink) 625 { 626 var sink = new _(observer, cancel); 627 setSink(sink); 628 return _source.SubscribeSafe(sink); 629 } 630 631 class _ : Sink<decimal?>, IObserver<decimal?> 632 { 633 private decimal? _lastValue; 634 _(IObserver<decimal?> observer, IDisposable cancel)635 public _(IObserver<decimal?> observer, IDisposable cancel) 636 : base(observer, cancel) 637 { 638 _lastValue = default(decimal?); 639 } 640 641 public void OnNext(decimal? value) 642 { 643 if (!value.HasValue) 644 return; 645 646 if (_lastValue.HasValue) 647 { 648 if (value > _lastValue) 649 { 650 _lastValue = value; 651 } 652 } 653 else 654 { 655 _lastValue = value; 656 } 657 } 658 OnError(Exception error)659 public void OnError(Exception error) 660 { 661 base._observer.OnError(error); 662 base.Dispose(); 663 } 664 OnCompleted()665 public void OnCompleted() 666 { 667 base._observer.OnNext(_lastValue); 668 base._observer.OnCompleted(); 669 base.Dispose(); 670 } 671 } 672 } 673 674 class MaxInt32Nullable : Producer<int?> 675 { 676 private readonly IObservable<int?> _source; 677 MaxInt32Nullable(IObservable<int?> source)678 public MaxInt32Nullable(IObservable<int?> source) 679 { 680 _source = source; 681 } 682 Run(IObserver<int?> observer, IDisposable cancel, Action<IDisposable> setSink)683 protected override IDisposable Run(IObserver<int?> observer, IDisposable cancel, Action<IDisposable> setSink) 684 { 685 var sink = new _(observer, cancel); 686 setSink(sink); 687 return _source.SubscribeSafe(sink); 688 } 689 690 class _ : Sink<int?>, IObserver<int?> 691 { 692 private int? _lastValue; 693 _(IObserver<int?> observer, IDisposable cancel)694 public _(IObserver<int?> observer, IDisposable cancel) 695 : base(observer, cancel) 696 { 697 _lastValue = default(int?); 698 } 699 OnNext(int? value)700 public void OnNext(int? value) 701 { 702 if (!value.HasValue) 703 return; 704 705 if (_lastValue.HasValue) 706 { 707 if (value > _lastValue) 708 { 709 _lastValue = value; 710 } 711 } 712 else 713 { 714 _lastValue = value; 715 } 716 } 717 OnError(Exception error)718 public void OnError(Exception error) 719 { 720 base._observer.OnError(error); 721 base.Dispose(); 722 } 723 OnCompleted()724 public void OnCompleted() 725 { 726 base._observer.OnNext(_lastValue); 727 base._observer.OnCompleted(); 728 base.Dispose(); 729 } 730 } 731 } 732 733 class MaxInt64Nullable : Producer<long?> 734 { 735 private readonly IObservable<long?> _source; 736 MaxInt64Nullable(IObservable<long?> source)737 public MaxInt64Nullable(IObservable<long?> source) 738 { 739 _source = source; 740 } 741 Run(IObserver<long?> observer, IDisposable cancel, Action<IDisposable> setSink)742 protected override IDisposable Run(IObserver<long?> observer, IDisposable cancel, Action<IDisposable> setSink) 743 { 744 var sink = new _(observer, cancel); 745 setSink(sink); 746 return _source.SubscribeSafe(sink); 747 } 748 749 class _ : Sink<long?>, IObserver<long?> 750 { 751 private long? _lastValue; 752 _(IObserver<long?> observer, IDisposable cancel)753 public _(IObserver<long?> observer, IDisposable cancel) 754 : base(observer, cancel) 755 { 756 _lastValue = default(long?); 757 } 758 OnNext(long? value)759 public void OnNext(long? value) 760 { 761 if (!value.HasValue) 762 return; 763 764 if (_lastValue.HasValue) 765 { 766 if (value > _lastValue) 767 { 768 _lastValue = value; 769 } 770 } 771 else 772 { 773 _lastValue = value; 774 } 775 } 776 OnError(Exception error)777 public void OnError(Exception error) 778 { 779 base._observer.OnError(error); 780 base.Dispose(); 781 } 782 OnCompleted()783 public void OnCompleted() 784 { 785 base._observer.OnNext(_lastValue); 786 base._observer.OnCompleted(); 787 base.Dispose(); 788 } 789 } 790 } 791 } 792 #endif