1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #if !NO_PERF 4 using System; 5 6 namespace System.Reactive.Linq.ObservableImpl 7 { 8 class AverageDouble : Producer<double> 9 { 10 private readonly IObservable<double> _source; 11 AverageDouble(IObservable<double> source)12 public AverageDouble(IObservable<double> source) 13 { 14 _source = source; 15 } 16 Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)17 protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink) 18 { 19 var sink = new _(observer, cancel); 20 setSink(sink); 21 return _source.SubscribeSafe(sink); 22 } 23 24 class _ : Sink<double>, IObserver<double> 25 { 26 private double _sum; 27 private long _count; 28 _(IObserver<double> observer, IDisposable cancel)29 public _(IObserver<double> observer, IDisposable cancel) 30 : base(observer, cancel) 31 { 32 _sum = 0.0; 33 _count = 0L; 34 } 35 OnNext(double value)36 public void OnNext(double value) 37 { 38 try 39 { 40 checked 41 { 42 _sum += value; 43 _count++; 44 } 45 } 46 catch (Exception ex) 47 { 48 base._observer.OnError(ex); 49 base.Dispose(); 50 } 51 } 52 OnError(Exception error)53 public void OnError(Exception error) 54 { 55 base._observer.OnError(error); 56 base.Dispose(); 57 } 58 OnCompleted()59 public void OnCompleted() 60 { 61 if (_count > 0) 62 { 63 base._observer.OnNext(_sum / _count); 64 base._observer.OnCompleted(); 65 } 66 else 67 { 68 base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); 69 } 70 71 base.Dispose(); 72 } 73 } 74 } 75 76 class AverageSingle : Producer<float> 77 { 78 private readonly IObservable<float> _source; 79 AverageSingle(IObservable<float> source)80 public AverageSingle(IObservable<float> source) 81 { 82 _source = source; 83 } 84 Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)85 protected override IDisposable Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink) 86 { 87 var sink = new _(observer, cancel); 88 setSink(sink); 89 return _source.SubscribeSafe(sink); 90 } 91 92 class _ : Sink<float>, IObserver<float> 93 { 94 private double _sum; // NOTE: Uses a different accumulator type (double), conform LINQ to Objects. 95 private long _count; 96 _(IObserver<float> observer, IDisposable cancel)97 public _(IObserver<float> observer, IDisposable cancel) 98 : base(observer, cancel) 99 { 100 _sum = 0.0; 101 _count = 0L; 102 } 103 OnNext(float value)104 public void OnNext(float value) 105 { 106 try 107 { 108 checked 109 { 110 _sum += value; 111 _count++; 112 } 113 } 114 catch (Exception ex) 115 { 116 base._observer.OnError(ex); 117 base.Dispose(); 118 } 119 } 120 OnError(Exception error)121 public void OnError(Exception error) 122 { 123 base._observer.OnError(error); 124 base.Dispose(); 125 } 126 OnCompleted()127 public void OnCompleted() 128 { 129 if (_count > 0) 130 { 131 base._observer.OnNext((float)(_sum / _count)); 132 base._observer.OnCompleted(); 133 } 134 else 135 { 136 base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); 137 } 138 139 base.Dispose(); 140 } 141 } 142 } 143 144 class AverageDecimal : Producer<decimal> 145 { 146 private readonly IObservable<decimal> _source; 147 AverageDecimal(IObservable<decimal> source)148 public AverageDecimal(IObservable<decimal> source) 149 { 150 _source = source; 151 } 152 Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)153 protected override IDisposable Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink) 154 { 155 var sink = new _(observer, cancel); 156 setSink(sink); 157 return _source.SubscribeSafe(sink); 158 } 159 160 class _ : Sink<decimal>, IObserver<decimal> 161 { 162 private decimal _sum; 163 private long _count; 164 _(IObserver<decimal> observer, IDisposable cancel)165 public _(IObserver<decimal> observer, IDisposable cancel) 166 : base(observer, cancel) 167 { 168 _sum = 0M; 169 _count = 0L; 170 } 171 OnNext(decimal value)172 public void OnNext(decimal value) 173 { 174 try 175 { 176 checked 177 { 178 _sum += value; 179 _count++; 180 } 181 } 182 catch (Exception ex) 183 { 184 base._observer.OnError(ex); 185 base.Dispose(); 186 } 187 } 188 OnError(Exception error)189 public void OnError(Exception error) 190 { 191 base._observer.OnError(error); 192 base.Dispose(); 193 } 194 OnCompleted()195 public void OnCompleted() 196 { 197 if (_count > 0) 198 { 199 base._observer.OnNext(_sum / _count); 200 base._observer.OnCompleted(); 201 } 202 else 203 { 204 base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); 205 } 206 207 base.Dispose(); 208 } 209 } 210 } 211 212 class AverageInt32 : Producer<double> 213 { 214 private readonly IObservable<int> _source; 215 AverageInt32(IObservable<int> source)216 public AverageInt32(IObservable<int> source) 217 { 218 _source = source; 219 } 220 Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)221 protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink) 222 { 223 var sink = new _(observer, cancel); 224 setSink(sink); 225 return _source.SubscribeSafe(sink); 226 } 227 228 class _ : Sink<double>, IObserver<int> 229 { 230 private long _sum; 231 private long _count; 232 _(IObserver<double> observer, IDisposable cancel)233 public _(IObserver<double> observer, IDisposable cancel) 234 : base(observer, cancel) 235 { 236 _sum = 0L; 237 _count = 0L; 238 } 239 OnNext(int value)240 public void OnNext(int value) 241 { 242 try 243 { 244 checked 245 { 246 _sum += value; 247 _count++; 248 } 249 } 250 catch (Exception ex) 251 { 252 base._observer.OnError(ex); 253 base.Dispose(); 254 } 255 } 256 OnError(Exception error)257 public void OnError(Exception error) 258 { 259 base._observer.OnError(error); 260 base.Dispose(); 261 } 262 OnCompleted()263 public void OnCompleted() 264 { 265 if (_count > 0) 266 { 267 base._observer.OnNext((double)_sum / _count); 268 base._observer.OnCompleted(); 269 } 270 else 271 { 272 base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); 273 } 274 275 base.Dispose(); 276 } 277 } 278 } 279 280 class AverageInt64 : Producer<double> 281 { 282 private readonly IObservable<long> _source; 283 AverageInt64(IObservable<long> source)284 public AverageInt64(IObservable<long> source) 285 { 286 _source = source; 287 } 288 Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)289 protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink) 290 { 291 var sink = new _(observer, cancel); 292 setSink(sink); 293 return _source.SubscribeSafe(sink); 294 } 295 296 class _ : Sink<double>, IObserver<long> 297 { 298 private long _sum; 299 private long _count; 300 _(IObserver<double> observer, IDisposable cancel)301 public _(IObserver<double> observer, IDisposable cancel) 302 : base(observer, cancel) 303 { 304 _sum = 0L; 305 _count = 0L; 306 } 307 OnNext(long value)308 public void OnNext(long value) 309 { 310 try 311 { 312 checked 313 { 314 _sum += value; 315 _count++; 316 } 317 } 318 catch (Exception ex) 319 { 320 base._observer.OnError(ex); 321 base.Dispose(); 322 } 323 } 324 OnError(Exception error)325 public void OnError(Exception error) 326 { 327 base._observer.OnError(error); 328 base.Dispose(); 329 } 330 OnCompleted()331 public void OnCompleted() 332 { 333 if (_count > 0) 334 { 335 base._observer.OnNext((double)_sum / _count); 336 base._observer.OnCompleted(); 337 } 338 else 339 { 340 base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); 341 } 342 343 base.Dispose(); 344 } 345 } 346 } 347 348 class AverageDoubleNullable : Producer<double?> 349 { 350 private readonly IObservable<double?> _source; 351 AverageDoubleNullable(IObservable<double?> source)352 public AverageDoubleNullable(IObservable<double?> source) 353 { 354 _source = source; 355 } 356 Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)357 protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink) 358 { 359 var sink = new _(observer, cancel); 360 setSink(sink); 361 return _source.SubscribeSafe(sink); 362 } 363 364 class _ : Sink<double?>, IObserver<double?> 365 { 366 private double _sum; 367 private long _count; 368 _(IObserver<double?> observer, IDisposable cancel)369 public _(IObserver<double?> observer, IDisposable cancel) 370 : base(observer, cancel) 371 { 372 _sum = 0.0; 373 _count = 0L; 374 } 375 OnNext(double? value)376 public void OnNext(double? value) 377 { 378 try 379 { 380 checked 381 { 382 if (value != null) 383 { 384 _sum += value.Value; 385 _count++; 386 } 387 } 388 } 389 catch (Exception ex) 390 { 391 base._observer.OnError(ex); 392 base.Dispose(); 393 } 394 } 395 OnError(Exception error)396 public void OnError(Exception error) 397 { 398 base._observer.OnError(error); 399 base.Dispose(); 400 } 401 OnCompleted()402 public void OnCompleted() 403 { 404 if (_count > 0) 405 { 406 base._observer.OnNext(_sum / _count); 407 } 408 else 409 { 410 base._observer.OnNext(null); 411 } 412 413 base._observer.OnCompleted(); 414 base.Dispose(); 415 } 416 } 417 } 418 419 class AverageSingleNullable : Producer<float?> 420 { 421 private readonly IObservable<float?> _source; 422 AverageSingleNullable(IObservable<float?> source)423 public AverageSingleNullable(IObservable<float?> source) 424 { 425 _source = source; 426 } 427 Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)428 protected override IDisposable Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink) 429 { 430 var sink = new _(observer, cancel); 431 setSink(sink); 432 return _source.SubscribeSafe(sink); 433 } 434 435 class _ : Sink<float?>, IObserver<float?> 436 { 437 private double _sum; // NOTE: Uses a different accumulator type (double), conform LINQ to Objects. 438 private long _count; 439 _(IObserver<float?> observer, IDisposable cancel)440 public _(IObserver<float?> observer, IDisposable cancel) 441 : base(observer, cancel) 442 { 443 _sum = 0.0; 444 _count = 0L; 445 } 446 OnNext(float? value)447 public void OnNext(float? value) 448 { 449 try 450 { 451 checked 452 { 453 if (value != null) 454 { 455 _sum += value.Value; 456 _count++; 457 } 458 } 459 } 460 catch (Exception ex) 461 { 462 base._observer.OnError(ex); 463 base.Dispose(); 464 } 465 } 466 OnError(Exception error)467 public void OnError(Exception error) 468 { 469 base._observer.OnError(error); 470 base.Dispose(); 471 } 472 OnCompleted()473 public void OnCompleted() 474 { 475 if (_count > 0) 476 { 477 base._observer.OnNext((float)(_sum / _count)); 478 } 479 else 480 { 481 base._observer.OnNext(null); 482 } 483 484 base._observer.OnCompleted(); 485 base.Dispose(); 486 } 487 } 488 } 489 490 class AverageDecimalNullable : Producer<decimal?> 491 { 492 private readonly IObservable<decimal?> _source; 493 AverageDecimalNullable(IObservable<decimal?> source)494 public AverageDecimalNullable(IObservable<decimal?> source) 495 { 496 _source = source; 497 } 498 Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)499 protected override IDisposable Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink) 500 { 501 var sink = new _(observer, cancel); 502 setSink(sink); 503 return _source.SubscribeSafe(sink); 504 } 505 506 class _ : Sink<decimal?>, IObserver<decimal?> 507 { 508 private decimal _sum; 509 private long _count; 510 _(IObserver<decimal?> observer, IDisposable cancel)511 public _(IObserver<decimal?> observer, IDisposable cancel) 512 : base(observer, cancel) 513 { 514 _sum = 0M; 515 _count = 0L; 516 } 517 518 public void OnNext(decimal? value) 519 { 520 try 521 { 522 checked 523 { 524 if (value != null) 525 { 526 _sum += value.Value; 527 _count++; 528 } 529 } 530 } 531 catch (Exception ex) 532 { 533 base._observer.OnError(ex); 534 base.Dispose(); 535 } 536 } 537 OnError(Exception error)538 public void OnError(Exception error) 539 { 540 base._observer.OnError(error); 541 base.Dispose(); 542 } 543 OnCompleted()544 public void OnCompleted() 545 { 546 if (_count > 0) 547 { 548 base._observer.OnNext(_sum / _count); 549 } 550 else 551 { 552 base._observer.OnNext(null); 553 } 554 555 base._observer.OnCompleted(); 556 base.Dispose(); 557 } 558 } 559 } 560 561 class AverageInt32Nullable : Producer<double?> 562 { 563 private readonly IObservable<int?> _source; 564 AverageInt32Nullable(IObservable<int?> source)565 public AverageInt32Nullable(IObservable<int?> source) 566 { 567 _source = source; 568 } 569 Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)570 protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink) 571 { 572 var sink = new _(observer, cancel); 573 setSink(sink); 574 return _source.SubscribeSafe(sink); 575 } 576 577 class _ : Sink<double?>, IObserver<int?> 578 { 579 private long _sum; 580 private long _count; 581 _(IObserver<double?> observer, IDisposable cancel)582 public _(IObserver<double?> observer, IDisposable cancel) 583 : base(observer, cancel) 584 { 585 _sum = 0L; 586 _count = 0L; 587 } 588 OnNext(int? value)589 public void OnNext(int? value) 590 { 591 try 592 { 593 checked 594 { 595 if (value != null) 596 { 597 _sum += value.Value; 598 _count++; 599 } 600 } 601 } 602 catch (Exception ex) 603 { 604 base._observer.OnError(ex); 605 base.Dispose(); 606 } 607 } 608 OnError(Exception error)609 public void OnError(Exception error) 610 { 611 base._observer.OnError(error); 612 base.Dispose(); 613 } 614 OnCompleted()615 public void OnCompleted() 616 { 617 if (_count > 0) 618 { 619 base._observer.OnNext((double)_sum / _count); 620 } 621 else 622 { 623 base._observer.OnNext(null); 624 } 625 626 base._observer.OnCompleted(); 627 base.Dispose(); 628 } 629 } 630 } 631 632 class AverageInt64Nullable : Producer<double?> 633 { 634 private readonly IObservable<long?> _source; 635 AverageInt64Nullable(IObservable<long?> source)636 public AverageInt64Nullable(IObservable<long?> source) 637 { 638 _source = source; 639 } 640 Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)641 protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink) 642 { 643 var sink = new _(observer, cancel); 644 setSink(sink); 645 return _source.SubscribeSafe(sink); 646 } 647 648 class _ : Sink<double?>, IObserver<long?> 649 { 650 private long _sum; 651 private long _count; 652 _(IObserver<double?> observer, IDisposable cancel)653 public _(IObserver<double?> observer, IDisposable cancel) 654 : base(observer, cancel) 655 { 656 _sum = 0L; 657 _count = 0L; 658 } 659 OnNext(long? value)660 public void OnNext(long? value) 661 { 662 try 663 { 664 checked 665 { 666 if (value != null) 667 { 668 _sum += value.Value; 669 _count++; 670 } 671 } 672 } 673 catch (Exception ex) 674 { 675 base._observer.OnError(ex); 676 base.Dispose(); 677 } 678 } 679 OnError(Exception error)680 public void OnError(Exception error) 681 { 682 base._observer.OnError(error); 683 base.Dispose(); 684 } 685 OnCompleted()686 public void OnCompleted() 687 { 688 if (_count > 0) 689 { 690 base._observer.OnNext((double)_sum / _count); 691 } 692 else 693 { 694 base._observer.OnNext(null); 695 } 696 697 base._observer.OnCompleted(); 698 base.Dispose(); 699 } 700 } 701 } 702 } 703 #endif