1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #if !NO_PERF
4 using System;
5 
6 namespace System.Reactive.Linq.ObservableImpl
7 {
8     class SumDouble : Producer<double>
9     {
10         private readonly IObservable<double> _source;
11 
SumDouble(IObservable<double> source)12         public SumDouble(IObservable<double> source)
13         {
14             _source = source;
15         }
16 
Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)17         protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)
18         {
19             var sink = new _(observer, cancel);
20             setSink(sink);
21             return _source.SubscribeSafe(sink);
22         }
23 
24         class _ : Sink<double>, IObserver<double>
25         {
26             private double _sum;
27 
_(IObserver<double> observer, IDisposable cancel)28             public _(IObserver<double> observer, IDisposable cancel)
29                 : base(observer, cancel)
30             {
31                 _sum = 0.0;
32             }
33 
OnNext(double value)34             public void OnNext(double value)
35             {
36                 _sum += value;
37             }
38 
OnError(Exception error)39             public void OnError(Exception error)
40             {
41                 base._observer.OnError(error);
42                 base.Dispose();
43             }
44 
OnCompleted()45             public void OnCompleted()
46             {
47                 base._observer.OnNext(_sum);
48                 base._observer.OnCompleted();
49                 base.Dispose();
50             }
51         }
52     }
53 
54     class SumSingle : Producer<float>
55     {
56         private readonly IObservable<float> _source;
57 
SumSingle(IObservable<float> source)58         public SumSingle(IObservable<float> source)
59         {
60             _source = source;
61         }
62 
Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)63         protected override IDisposable Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)
64         {
65             var sink = new _(observer, cancel);
66             setSink(sink);
67             return _source.SubscribeSafe(sink);
68         }
69 
70         class _ : Sink<float>, IObserver<float>
71         {
72             private double _sum; // This is what LINQ to Objects does!
73 
_(IObserver<float> observer, IDisposable cancel)74             public _(IObserver<float> observer, IDisposable cancel)
75                 : base(observer, cancel)
76             {
77                 _sum = 0.0; // This is what LINQ to Objects does!
78             }
79 
OnNext(float value)80             public void OnNext(float value)
81             {
82                 _sum += value; // This is what LINQ to Objects does!
83             }
84 
OnError(Exception error)85             public void OnError(Exception error)
86             {
87                 base._observer.OnError(error);
88                 base.Dispose();
89             }
90 
OnCompleted()91             public void OnCompleted()
92             {
93                 base._observer.OnNext((float)_sum); // This is what LINQ to Objects does!
94                 base._observer.OnCompleted();
95                 base.Dispose();
96             }
97         }
98     }
99 
100     class SumDecimal : Producer<decimal>
101     {
102         private readonly IObservable<decimal> _source;
103 
SumDecimal(IObservable<decimal> source)104         public SumDecimal(IObservable<decimal> source)
105         {
106             _source = source;
107         }
108 
Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)109         protected override IDisposable Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)
110         {
111             var sink = new _(observer, cancel);
112             setSink(sink);
113             return _source.SubscribeSafe(sink);
114         }
115 
116         class _ : Sink<decimal>, IObserver<decimal>
117         {
118             private decimal _sum;
119 
_(IObserver<decimal> observer, IDisposable cancel)120             public _(IObserver<decimal> observer, IDisposable cancel)
121                 : base(observer, cancel)
122             {
123                 _sum = 0M;
124             }
125 
OnNext(decimal value)126             public void OnNext(decimal value)
127             {
128                 _sum += value;
129             }
130 
OnError(Exception error)131             public void OnError(Exception error)
132             {
133                 base._observer.OnError(error);
134                 base.Dispose();
135             }
136 
OnCompleted()137             public void OnCompleted()
138             {
139                 base._observer.OnNext(_sum);
140                 base._observer.OnCompleted();
141                 base.Dispose();
142             }
143         }
144     }
145 
146     class SumInt32 : Producer<int>
147     {
148         private readonly IObservable<int> _source;
149 
SumInt32(IObservable<int> source)150         public SumInt32(IObservable<int> source)
151         {
152             _source = source;
153         }
154 
Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)155         protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)
156         {
157             var sink = new _(observer, cancel);
158             setSink(sink);
159             return _source.SubscribeSafe(sink);
160         }
161 
162         class _ : Sink<int>, IObserver<int>
163         {
164             private int _sum;
165 
_(IObserver<int> observer, IDisposable cancel)166             public _(IObserver<int> observer, IDisposable cancel)
167                 : base(observer, cancel)
168             {
169                 _sum = 0;
170             }
171 
OnNext(int value)172             public void OnNext(int value)
173             {
174                 try
175                 {
176                     checked
177                     {
178                         _sum += value;
179                     }
180                 }
181                 catch (Exception exception)
182                 {
183                     base._observer.OnError(exception);
184                     base.Dispose();
185                 }
186             }
187 
OnError(Exception error)188             public void OnError(Exception error)
189             {
190                 base._observer.OnError(error);
191                 base.Dispose();
192             }
193 
OnCompleted()194             public void OnCompleted()
195             {
196                 base._observer.OnNext(_sum);
197                 base._observer.OnCompleted();
198                 base.Dispose();
199             }
200         }
201     }
202 
203     class SumInt64 : Producer<long>
204     {
205         private readonly IObservable<long> _source;
206 
SumInt64(IObservable<long> source)207         public SumInt64(IObservable<long> source)
208         {
209             _source = source;
210         }
211 
Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)212         protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
213         {
214             var sink = new _(observer, cancel);
215             setSink(sink);
216             return _source.SubscribeSafe(sink);
217         }
218 
219         class _ : Sink<long>, IObserver<long>
220         {
221             private long _sum;
222 
_(IObserver<long> observer, IDisposable cancel)223             public _(IObserver<long> observer, IDisposable cancel)
224                 : base(observer, cancel)
225             {
226                 _sum = 0L;
227             }
228 
OnNext(long value)229             public void OnNext(long value)
230             {
231                 try
232                 {
233                     checked
234                     {
235                         _sum += value;
236                     }
237                 }
238                 catch (Exception exception)
239                 {
240                     base._observer.OnError(exception);
241                     base.Dispose();
242                 }
243             }
244 
OnError(Exception error)245             public void OnError(Exception error)
246             {
247                 base._observer.OnError(error);
248                 base.Dispose();
249             }
250 
OnCompleted()251             public void OnCompleted()
252             {
253                 base._observer.OnNext(_sum);
254                 base._observer.OnCompleted();
255                 base.Dispose();
256             }
257         }
258     }
259 
260     class SumDoubleNullable : Producer<double?>
261     {
262         private readonly IObservable<double?> _source;
263 
SumDoubleNullable(IObservable<double?> source)264         public SumDoubleNullable(IObservable<double?> source)
265         {
266             _source = source;
267         }
268 
Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)269         protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)
270         {
271             var sink = new _(observer, cancel);
272             setSink(sink);
273             return _source.SubscribeSafe(sink);
274         }
275 
276         class _ : Sink<double?>, IObserver<double?>
277         {
278             private double _sum;
279 
_(IObserver<double?> observer, IDisposable cancel)280             public _(IObserver<double?> observer, IDisposable cancel)
281                 : base(observer, cancel)
282             {
283                 _sum = 0.0;
284             }
285 
OnNext(double? value)286             public void OnNext(double? value)
287             {
288                 if (value != null)
289                     _sum += value.Value;
290             }
291 
OnError(Exception error)292             public void OnError(Exception error)
293             {
294                 base._observer.OnError(error);
295                 base.Dispose();
296             }
297 
OnCompleted()298             public void OnCompleted()
299             {
300                 base._observer.OnNext(_sum);
301                 base._observer.OnCompleted();
302                 base.Dispose();
303             }
304         }
305     }
306 
307     class SumSingleNullable : Producer<float?>
308     {
309         private readonly IObservable<float?> _source;
310 
SumSingleNullable(IObservable<float?> source)311         public SumSingleNullable(IObservable<float?> source)
312         {
313             _source = source;
314         }
315 
Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)316         protected override IDisposable Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)
317         {
318             var sink = new _(observer, cancel);
319             setSink(sink);
320             return _source.SubscribeSafe(sink);
321         }
322 
323         class _ : Sink<float?>, IObserver<float?>
324         {
325             private double _sum; // This is what LINQ to Objects does!
326 
_(IObserver<float?> observer, IDisposable cancel)327             public _(IObserver<float?> observer, IDisposable cancel)
328                 : base(observer, cancel)
329             {
330                 _sum = 0.0; // This is what LINQ to Objects does!
331             }
332 
OnNext(float? value)333             public void OnNext(float? value)
334             {
335                 if (value != null)
336                     _sum += value.Value; // This is what LINQ to Objects does!
337             }
338 
OnError(Exception error)339             public void OnError(Exception error)
340             {
341                 base._observer.OnError(error);
342                 base.Dispose();
343             }
344 
OnCompleted()345             public void OnCompleted()
346             {
347                 base._observer.OnNext((float)_sum); // This is what LINQ to Objects does!
348                 base._observer.OnCompleted();
349                 base.Dispose();
350             }
351         }
352     }
353 
354     class SumDecimalNullable : Producer<decimal?>
355     {
356         private readonly IObservable<decimal?> _source;
357 
SumDecimalNullable(IObservable<decimal?> source)358         public SumDecimalNullable(IObservable<decimal?> source)
359         {
360             _source = source;
361         }
362 
Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)363         protected override IDisposable Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)
364         {
365             var sink = new _(observer, cancel);
366             setSink(sink);
367             return _source.SubscribeSafe(sink);
368         }
369 
370         class _ : Sink<decimal?>, IObserver<decimal?>
371         {
372             private decimal _sum;
373 
_(IObserver<decimal?> observer, IDisposable cancel)374             public _(IObserver<decimal?> observer, IDisposable cancel)
375                 : base(observer, cancel)
376             {
377                 _sum = 0M;
378             }
379 
380             public void OnNext(decimal? value)
381             {
382                 if (value != null)
383                     _sum += value.Value;
384             }
385 
OnError(Exception error)386             public void OnError(Exception error)
387             {
388                 base._observer.OnError(error);
389                 base.Dispose();
390             }
391 
OnCompleted()392             public void OnCompleted()
393             {
394                 base._observer.OnNext(_sum);
395                 base._observer.OnCompleted();
396                 base.Dispose();
397             }
398         }
399     }
400 
401     class SumInt32Nullable : Producer<int?>
402     {
403         private readonly IObservable<int?> _source;
404 
SumInt32Nullable(IObservable<int?> source)405         public SumInt32Nullable(IObservable<int?> source)
406         {
407             _source = source;
408         }
409 
Run(IObserver<int?> observer, IDisposable cancel, Action<IDisposable> setSink)410         protected override IDisposable Run(IObserver<int?> observer, IDisposable cancel, Action<IDisposable> setSink)
411         {
412             var sink = new _(observer, cancel);
413             setSink(sink);
414             return _source.SubscribeSafe(sink);
415         }
416 
417         class _ : Sink<int?>, IObserver<int?>
418         {
419             private int _sum;
420 
_(IObserver<int?> observer, IDisposable cancel)421             public _(IObserver<int?> observer, IDisposable cancel)
422                 : base(observer, cancel)
423             {
424                 _sum = 0;
425             }
426 
OnNext(int? value)427             public void OnNext(int? value)
428             {
429                 try
430                 {
431                     checked
432                     {
433                         if (value != null)
434                             _sum += value.Value;
435                     }
436                 }
437                 catch (Exception exception)
438                 {
439                     base._observer.OnError(exception);
440                     base.Dispose();
441                 }
442             }
443 
OnError(Exception error)444             public void OnError(Exception error)
445             {
446                 base._observer.OnError(error);
447                 base.Dispose();
448             }
449 
OnCompleted()450             public void OnCompleted()
451             {
452                 base._observer.OnNext(_sum);
453                 base._observer.OnCompleted();
454                 base.Dispose();
455             }
456         }
457     }
458 
459     class SumInt64Nullable : Producer<long?>
460     {
461         private readonly IObservable<long?> _source;
462 
SumInt64Nullable(IObservable<long?> source)463         public SumInt64Nullable(IObservable<long?> source)
464         {
465             _source = source;
466         }
467 
Run(IObserver<long?> observer, IDisposable cancel, Action<IDisposable> setSink)468         protected override IDisposable Run(IObserver<long?> observer, IDisposable cancel, Action<IDisposable> setSink)
469         {
470             var sink = new _(observer, cancel);
471             setSink(sink);
472             return _source.SubscribeSafe(sink);
473         }
474 
475         class _ : Sink<long?>, IObserver<long?>
476         {
477             private long _sum;
478 
_(IObserver<long?> observer, IDisposable cancel)479             public _(IObserver<long?> observer, IDisposable cancel)
480                 : base(observer, cancel)
481             {
482                 _sum = 0L;
483             }
484 
OnNext(long? value)485             public void OnNext(long? value)
486             {
487                 try
488                 {
489                     checked
490                     {
491                         if (value != null)
492                             _sum += value.Value;
493                     }
494                 }
495                 catch (Exception exception)
496                 {
497                     base._observer.OnError(exception);
498                     base.Dispose();
499                 }
500             }
501 
OnError(Exception error)502             public void OnError(Exception error)
503             {
504                 base._observer.OnError(error);
505                 base.Dispose();
506             }
507 
OnCompleted()508             public void OnCompleted()
509             {
510                 base._observer.OnNext(_sum);
511                 base._observer.OnCompleted();
512                 base.Dispose();
513             }
514         }
515     }
516 }
517 #endif