1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #if !NO_PERF
4 using System;
5 
6 namespace System.Reactive.Linq.ObservableImpl
7 {
8     class AverageDouble : Producer<double>
9     {
10         private readonly IObservable<double> _source;
11 
AverageDouble(IObservable<double> source)12         public AverageDouble(IObservable<double> source)
13         {
14             _source = source;
15         }
16 
Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)17         protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)
18         {
19             var sink = new _(observer, cancel);
20             setSink(sink);
21             return _source.SubscribeSafe(sink);
22         }
23 
24         class _ : Sink<double>, IObserver<double>
25         {
26             private double _sum;
27             private long _count;
28 
_(IObserver<double> observer, IDisposable cancel)29             public _(IObserver<double> observer, IDisposable cancel)
30                 : base(observer, cancel)
31             {
32                 _sum = 0.0;
33                 _count = 0L;
34             }
35 
OnNext(double value)36             public void OnNext(double value)
37             {
38                 try
39                 {
40                     checked
41                     {
42                         _sum += value;
43                         _count++;
44                     }
45                 }
46                 catch (Exception ex)
47                 {
48                     base._observer.OnError(ex);
49                     base.Dispose();
50                 }
51             }
52 
OnError(Exception error)53             public void OnError(Exception error)
54             {
55                 base._observer.OnError(error);
56                 base.Dispose();
57             }
58 
OnCompleted()59             public void OnCompleted()
60             {
61                 if (_count > 0)
62                 {
63                     base._observer.OnNext(_sum / _count);
64                     base._observer.OnCompleted();
65                 }
66                 else
67                 {
68                     base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
69                 }
70 
71                 base.Dispose();
72             }
73         }
74     }
75 
76     class AverageSingle : Producer<float>
77     {
78         private readonly IObservable<float> _source;
79 
AverageSingle(IObservable<float> source)80         public AverageSingle(IObservable<float> source)
81         {
82             _source = source;
83         }
84 
Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)85         protected override IDisposable Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)
86         {
87             var sink = new _(observer, cancel);
88             setSink(sink);
89             return _source.SubscribeSafe(sink);
90         }
91 
92         class _ : Sink<float>, IObserver<float>
93         {
94             private double _sum; // NOTE: Uses a different accumulator type (double), conform LINQ to Objects.
95             private long _count;
96 
_(IObserver<float> observer, IDisposable cancel)97             public _(IObserver<float> observer, IDisposable cancel)
98                 : base(observer, cancel)
99             {
100                 _sum = 0.0;
101                 _count = 0L;
102             }
103 
OnNext(float value)104             public void OnNext(float value)
105             {
106                 try
107                 {
108                     checked
109                     {
110                         _sum += value;
111                         _count++;
112                     }
113                 }
114                 catch (Exception ex)
115                 {
116                     base._observer.OnError(ex);
117                     base.Dispose();
118                 }
119             }
120 
OnError(Exception error)121             public void OnError(Exception error)
122             {
123                 base._observer.OnError(error);
124                 base.Dispose();
125             }
126 
OnCompleted()127             public void OnCompleted()
128             {
129                 if (_count > 0)
130                 {
131                     base._observer.OnNext((float)(_sum / _count));
132                     base._observer.OnCompleted();
133                 }
134                 else
135                 {
136                     base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
137                 }
138 
139                 base.Dispose();
140             }
141         }
142     }
143 
144     class AverageDecimal : Producer<decimal>
145     {
146         private readonly IObservable<decimal> _source;
147 
AverageDecimal(IObservable<decimal> source)148         public AverageDecimal(IObservable<decimal> source)
149         {
150             _source = source;
151         }
152 
Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)153         protected override IDisposable Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)
154         {
155             var sink = new _(observer, cancel);
156             setSink(sink);
157             return _source.SubscribeSafe(sink);
158         }
159 
160         class _ : Sink<decimal>, IObserver<decimal>
161         {
162             private decimal _sum;
163             private long _count;
164 
_(IObserver<decimal> observer, IDisposable cancel)165             public _(IObserver<decimal> observer, IDisposable cancel)
166                 : base(observer, cancel)
167             {
168                 _sum = 0M;
169                 _count = 0L;
170             }
171 
OnNext(decimal value)172             public void OnNext(decimal value)
173             {
174                 try
175                 {
176                     checked
177                     {
178                         _sum += value;
179                         _count++;
180                     }
181                 }
182                 catch (Exception ex)
183                 {
184                     base._observer.OnError(ex);
185                     base.Dispose();
186                 }
187             }
188 
OnError(Exception error)189             public void OnError(Exception error)
190             {
191                 base._observer.OnError(error);
192                 base.Dispose();
193             }
194 
OnCompleted()195             public void OnCompleted()
196             {
197                 if (_count > 0)
198                 {
199                     base._observer.OnNext(_sum / _count);
200                     base._observer.OnCompleted();
201                 }
202                 else
203                 {
204                     base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
205                 }
206 
207                 base.Dispose();
208             }
209         }
210     }
211 
212     class AverageInt32 : Producer<double>
213     {
214         private readonly IObservable<int> _source;
215 
AverageInt32(IObservable<int> source)216         public AverageInt32(IObservable<int> source)
217         {
218             _source = source;
219         }
220 
Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)221         protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)
222         {
223             var sink = new _(observer, cancel);
224             setSink(sink);
225             return _source.SubscribeSafe(sink);
226         }
227 
228         class _ : Sink<double>, IObserver<int>
229         {
230             private long _sum;
231             private long _count;
232 
_(IObserver<double> observer, IDisposable cancel)233             public _(IObserver<double> observer, IDisposable cancel)
234                 : base(observer, cancel)
235             {
236                 _sum = 0L;
237                 _count = 0L;
238             }
239 
OnNext(int value)240             public void OnNext(int value)
241             {
242                 try
243                 {
244                     checked
245                     {
246                         _sum += value;
247                         _count++;
248                     }
249                 }
250                 catch (Exception ex)
251                 {
252                     base._observer.OnError(ex);
253                     base.Dispose();
254                 }
255             }
256 
OnError(Exception error)257             public void OnError(Exception error)
258             {
259                 base._observer.OnError(error);
260                 base.Dispose();
261             }
262 
OnCompleted()263             public void OnCompleted()
264             {
265                 if (_count > 0)
266                 {
267                     base._observer.OnNext((double)_sum / _count);
268                     base._observer.OnCompleted();
269                 }
270                 else
271                 {
272                     base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
273                 }
274 
275                 base.Dispose();
276             }
277         }
278     }
279 
280     class AverageInt64 : Producer<double>
281     {
282         private readonly IObservable<long> _source;
283 
AverageInt64(IObservable<long> source)284         public AverageInt64(IObservable<long> source)
285         {
286             _source = source;
287         }
288 
Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)289         protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)
290         {
291             var sink = new _(observer, cancel);
292             setSink(sink);
293             return _source.SubscribeSafe(sink);
294         }
295 
296         class _ : Sink<double>, IObserver<long>
297         {
298             private long _sum;
299             private long _count;
300 
_(IObserver<double> observer, IDisposable cancel)301             public _(IObserver<double> observer, IDisposable cancel)
302                 : base(observer, cancel)
303             {
304                 _sum = 0L;
305                 _count = 0L;
306             }
307 
OnNext(long value)308             public void OnNext(long value)
309             {
310                 try
311                 {
312                     checked
313                     {
314                         _sum += value;
315                         _count++;
316                     }
317                 }
318                 catch (Exception ex)
319                 {
320                     base._observer.OnError(ex);
321                     base.Dispose();
322                 }
323             }
324 
OnError(Exception error)325             public void OnError(Exception error)
326             {
327                 base._observer.OnError(error);
328                 base.Dispose();
329             }
330 
OnCompleted()331             public void OnCompleted()
332             {
333                 if (_count > 0)
334                 {
335                     base._observer.OnNext((double)_sum / _count);
336                     base._observer.OnCompleted();
337                 }
338                 else
339                 {
340                     base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
341                 }
342 
343                 base.Dispose();
344             }
345         }
346     }
347 
348     class AverageDoubleNullable : Producer<double?>
349     {
350         private readonly IObservable<double?> _source;
351 
AverageDoubleNullable(IObservable<double?> source)352         public AverageDoubleNullable(IObservable<double?> source)
353         {
354             _source = source;
355         }
356 
Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)357         protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)
358         {
359             var sink = new _(observer, cancel);
360             setSink(sink);
361             return _source.SubscribeSafe(sink);
362         }
363 
364         class _ : Sink<double?>, IObserver<double?>
365         {
366             private double _sum;
367             private long _count;
368 
_(IObserver<double?> observer, IDisposable cancel)369             public _(IObserver<double?> observer, IDisposable cancel)
370                 : base(observer, cancel)
371             {
372                 _sum = 0.0;
373                 _count = 0L;
374             }
375 
OnNext(double? value)376             public void OnNext(double? value)
377             {
378                 try
379                 {
380                     checked
381                     {
382                         if (value != null)
383                         {
384                             _sum += value.Value;
385                             _count++;
386                         }
387                     }
388                 }
389                 catch (Exception ex)
390                 {
391                     base._observer.OnError(ex);
392                     base.Dispose();
393                 }
394             }
395 
OnError(Exception error)396             public void OnError(Exception error)
397             {
398                 base._observer.OnError(error);
399                 base.Dispose();
400             }
401 
OnCompleted()402             public void OnCompleted()
403             {
404                 if (_count > 0)
405                 {
406                     base._observer.OnNext(_sum / _count);
407                 }
408                 else
409                 {
410                     base._observer.OnNext(null);
411                 }
412 
413                 base._observer.OnCompleted();
414                 base.Dispose();
415             }
416         }
417     }
418 
419     class AverageSingleNullable : Producer<float?>
420     {
421         private readonly IObservable<float?> _source;
422 
AverageSingleNullable(IObservable<float?> source)423         public AverageSingleNullable(IObservable<float?> source)
424         {
425             _source = source;
426         }
427 
Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)428         protected override IDisposable Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)
429         {
430             var sink = new _(observer, cancel);
431             setSink(sink);
432             return _source.SubscribeSafe(sink);
433         }
434 
435         class _ : Sink<float?>, IObserver<float?>
436         {
437             private double _sum; // NOTE: Uses a different accumulator type (double), conform LINQ to Objects.
438             private long _count;
439 
_(IObserver<float?> observer, IDisposable cancel)440             public _(IObserver<float?> observer, IDisposable cancel)
441                 : base(observer, cancel)
442             {
443                 _sum = 0.0;
444                 _count = 0L;
445             }
446 
OnNext(float? value)447             public void OnNext(float? value)
448             {
449                 try
450                 {
451                     checked
452                     {
453                         if (value != null)
454                         {
455                             _sum += value.Value;
456                             _count++;
457                         }
458                     }
459                 }
460                 catch (Exception ex)
461                 {
462                     base._observer.OnError(ex);
463                     base.Dispose();
464                 }
465             }
466 
OnError(Exception error)467             public void OnError(Exception error)
468             {
469                 base._observer.OnError(error);
470                 base.Dispose();
471             }
472 
OnCompleted()473             public void OnCompleted()
474             {
475                 if (_count > 0)
476                 {
477                     base._observer.OnNext((float)(_sum / _count));
478                 }
479                 else
480                 {
481                     base._observer.OnNext(null);
482                 }
483 
484                 base._observer.OnCompleted();
485                 base.Dispose();
486             }
487         }
488     }
489 
490     class AverageDecimalNullable : Producer<decimal?>
491     {
492         private readonly IObservable<decimal?> _source;
493 
AverageDecimalNullable(IObservable<decimal?> source)494         public AverageDecimalNullable(IObservable<decimal?> source)
495         {
496             _source = source;
497         }
498 
Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)499         protected override IDisposable Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)
500         {
501             var sink = new _(observer, cancel);
502             setSink(sink);
503             return _source.SubscribeSafe(sink);
504         }
505 
506         class _ : Sink<decimal?>, IObserver<decimal?>
507         {
508             private decimal _sum;
509             private long _count;
510 
_(IObserver<decimal?> observer, IDisposable cancel)511             public _(IObserver<decimal?> observer, IDisposable cancel)
512                 : base(observer, cancel)
513             {
514                 _sum = 0M;
515                 _count = 0L;
516             }
517 
518             public void OnNext(decimal? value)
519             {
520                 try
521                 {
522                     checked
523                     {
524                         if (value != null)
525                         {
526                             _sum += value.Value;
527                             _count++;
528                         }
529                     }
530                 }
531                 catch (Exception ex)
532                 {
533                     base._observer.OnError(ex);
534                     base.Dispose();
535                 }
536             }
537 
OnError(Exception error)538             public void OnError(Exception error)
539             {
540                 base._observer.OnError(error);
541                 base.Dispose();
542             }
543 
OnCompleted()544             public void OnCompleted()
545             {
546                 if (_count > 0)
547                 {
548                     base._observer.OnNext(_sum / _count);
549                 }
550                 else
551                 {
552                     base._observer.OnNext(null);
553                 }
554 
555                 base._observer.OnCompleted();
556                 base.Dispose();
557             }
558         }
559     }
560 
561     class AverageInt32Nullable : Producer<double?>
562     {
563         private readonly IObservable<int?> _source;
564 
AverageInt32Nullable(IObservable<int?> source)565         public AverageInt32Nullable(IObservable<int?> source)
566         {
567             _source = source;
568         }
569 
Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)570         protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)
571         {
572             var sink = new _(observer, cancel);
573             setSink(sink);
574             return _source.SubscribeSafe(sink);
575         }
576 
577         class _ : Sink<double?>, IObserver<int?>
578         {
579             private long _sum;
580             private long _count;
581 
_(IObserver<double?> observer, IDisposable cancel)582             public _(IObserver<double?> observer, IDisposable cancel)
583                 : base(observer, cancel)
584             {
585                 _sum = 0L;
586                 _count = 0L;
587             }
588 
OnNext(int? value)589             public void OnNext(int? value)
590             {
591                 try
592                 {
593                     checked
594                     {
595                         if (value != null)
596                         {
597                             _sum += value.Value;
598                             _count++;
599                         }
600                     }
601                 }
602                 catch (Exception ex)
603                 {
604                     base._observer.OnError(ex);
605                     base.Dispose();
606                 }
607             }
608 
OnError(Exception error)609             public void OnError(Exception error)
610             {
611                 base._observer.OnError(error);
612                 base.Dispose();
613             }
614 
OnCompleted()615             public void OnCompleted()
616             {
617                 if (_count > 0)
618                 {
619                     base._observer.OnNext((double)_sum / _count);
620                 }
621                 else
622                 {
623                     base._observer.OnNext(null);
624                 }
625 
626                 base._observer.OnCompleted();
627                 base.Dispose();
628             }
629         }
630     }
631 
632     class AverageInt64Nullable : Producer<double?>
633     {
634         private readonly IObservable<long?> _source;
635 
AverageInt64Nullable(IObservable<long?> source)636         public AverageInt64Nullable(IObservable<long?> source)
637         {
638             _source = source;
639         }
640 
Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)641         protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)
642         {
643             var sink = new _(observer, cancel);
644             setSink(sink);
645             return _source.SubscribeSafe(sink);
646         }
647 
648         class _ : Sink<double?>, IObserver<long?>
649         {
650             private long _sum;
651             private long _count;
652 
_(IObserver<double?> observer, IDisposable cancel)653             public _(IObserver<double?> observer, IDisposable cancel)
654                 : base(observer, cancel)
655             {
656                 _sum = 0L;
657                 _count = 0L;
658             }
659 
OnNext(long? value)660             public void OnNext(long? value)
661             {
662                 try
663                 {
664                     checked
665                     {
666                         if (value != null)
667                         {
668                             _sum += value.Value;
669                             _count++;
670                         }
671                     }
672                 }
673                 catch (Exception ex)
674                 {
675                     base._observer.OnError(ex);
676                     base.Dispose();
677                 }
678             }
679 
OnError(Exception error)680             public void OnError(Exception error)
681             {
682                 base._observer.OnError(error);
683                 base.Dispose();
684             }
685 
OnCompleted()686             public void OnCompleted()
687             {
688                 if (_count > 0)
689                 {
690                     base._observer.OnNext((double)_sum / _count);
691                 }
692                 else
693                 {
694                     base._observer.OnNext(null);
695                 }
696 
697                 base._observer.OnCompleted();
698                 base.Dispose();
699             }
700         }
701     }
702 }
703 #endif