1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #if !NO_PERF
4 using System;
5 using System.Collections.Generic;
6 
7 namespace System.Reactive.Linq.ObservableImpl
8 {
9     class Max<TSource> : Producer<TSource>
10     {
11         private readonly IObservable<TSource> _source;
12         private readonly IComparer<TSource> _comparer;
13 
Max(IObservable<TSource> source, IComparer<TSource> comparer)14         public Max(IObservable<TSource> source, IComparer<TSource> comparer)
15         {
16             _source = source;
17             _comparer = comparer;
18         }
19 
Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)20         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
21         {
22             // LINQ to Objects makes this distinction in order to make [Max|Max] of an empty collection of reference type objects equal to null.
23             if (default(TSource) == null)
24             {
25                 var sink = new _(this, observer, cancel);
26                 setSink(sink);
27                 return _source.SubscribeSafe(sink);
28             }
29             else
30             {
31                 var sink = new Delta(this, observer, cancel);
32                 setSink(sink);
33                 return _source.SubscribeSafe(sink);
34             }
35         }
36 
37         class Delta : Sink<TSource>, IObserver<TSource>
38         {
39             private readonly Max<TSource> _parent;
40             private bool _hasValue;
41             private TSource _lastValue;
42 
Delta(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel)43             public Delta(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
44                 : base(observer, cancel)
45             {
46                 _parent = parent;
47 
48                 _hasValue = false;
49                 _lastValue = default(TSource);
50             }
51 
OnNext(TSource value)52             public void OnNext(TSource value)
53             {
54                 if (_hasValue)
55                 {
56                     var comparison = 0;
57 
58                     try
59                     {
60                         comparison = _parent._comparer.Compare(value, _lastValue);
61                     }
62                     catch (Exception ex)
63                     {
64                         base._observer.OnError(ex);
65                         base.Dispose();
66                         return;
67                     }
68 
69                     if (comparison > 0)
70                     {
71                         _lastValue = value;
72                     }
73                 }
74                 else
75                 {
76                     _hasValue = true;
77                     _lastValue = value;
78                 }
79             }
80 
OnError(Exception error)81             public void OnError(Exception error)
82             {
83                 base._observer.OnError(error);
84                 base.Dispose();
85             }
86 
OnCompleted()87             public void OnCompleted()
88             {
89                 if (!_hasValue)
90                 {
91                     base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
92                 }
93                 else
94                 {
95                     base._observer.OnNext(_lastValue);
96                     base._observer.OnCompleted();
97                 }
98 
99                 base.Dispose();
100             }
101         }
102 
103         class _ : Sink<TSource>, IObserver<TSource>
104         {
105             private readonly Max<TSource> _parent;
106             private TSource _lastValue;
107 
_(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel)108             public _(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
109                 : base(observer, cancel)
110             {
111                 _parent = parent;
112 
113                 _lastValue = default(TSource);
114             }
115 
OnNext(TSource value)116             public void OnNext(TSource value)
117             {
118                 if (value != null)
119                 {
120                     if (_lastValue == null)
121                     {
122                         _lastValue = value;
123                     }
124                     else
125                     {
126                         var comparison = 0;
127 
128                         try
129                         {
130                             comparison = _parent._comparer.Compare(value, _lastValue);
131                         }
132                         catch (Exception ex)
133                         {
134                             base._observer.OnError(ex);
135                             base.Dispose();
136                             return;
137                         }
138 
139                         if (comparison > 0)
140                         {
141                             _lastValue = value;
142                         }
143                     }
144                 }
145             }
146 
OnError(Exception error)147             public void OnError(Exception error)
148             {
149                 base._observer.OnError(error);
150                 base.Dispose();
151             }
152 
OnCompleted()153             public void OnCompleted()
154             {
155                 base._observer.OnNext(_lastValue);
156                 base._observer.OnCompleted();
157                 base.Dispose();
158             }
159         }
160     }
161 
162     class MaxDouble : Producer<double>
163     {
164         private readonly IObservable<double> _source;
165 
MaxDouble(IObservable<double> source)166         public MaxDouble(IObservable<double> source)
167         {
168             _source = source;
169         }
170 
Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)171         protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)
172         {
173             var sink = new _(observer, cancel);
174             setSink(sink);
175             return _source.SubscribeSafe(sink);
176         }
177 
178         class _ : Sink<double>, IObserver<double>
179         {
180             private bool _hasValue;
181             private double _lastValue;
182 
_(IObserver<double> observer, IDisposable cancel)183             public _(IObserver<double> observer, IDisposable cancel)
184                 : base(observer, cancel)
185             {
186                 _hasValue = false;
187                 _lastValue = default(double);
188             }
189 
OnNext(double value)190             public void OnNext(double value)
191             {
192                 if (_hasValue)
193                 {
194                     if (value > _lastValue || double.IsNaN(value))
195                     {
196                         _lastValue = value;
197                     }
198                 }
199                 else
200                 {
201                     _lastValue = value;
202                     _hasValue = true;
203                 }
204             }
205 
OnError(Exception error)206             public void OnError(Exception error)
207             {
208                 base._observer.OnError(error);
209                 base.Dispose();
210             }
211 
OnCompleted()212             public void OnCompleted()
213             {
214                 if (!_hasValue)
215                 {
216                     base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
217                 }
218                 else
219                 {
220                     base._observer.OnNext(_lastValue);
221                     base._observer.OnCompleted();
222                 }
223 
224                 base.Dispose();
225             }
226         }
227     }
228 
229     class MaxSingle : Producer<float>
230     {
231         private readonly IObservable<float> _source;
232 
MaxSingle(IObservable<float> source)233         public MaxSingle(IObservable<float> source)
234         {
235             _source = source;
236         }
237 
Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)238         protected override IDisposable Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)
239         {
240             var sink = new _(observer, cancel);
241             setSink(sink);
242             return _source.SubscribeSafe(sink);
243         }
244 
245         class _ : Sink<float>, IObserver<float>
246         {
247             private bool _hasValue;
248             private float _lastValue;
249 
_(IObserver<float> observer, IDisposable cancel)250             public _(IObserver<float> observer, IDisposable cancel)
251                 : base(observer, cancel)
252             {
253                 _hasValue = false;
254                 _lastValue = default(float);
255             }
256 
OnNext(float value)257             public void OnNext(float value)
258             {
259                 if (_hasValue)
260                 {
261                     if (value > _lastValue || float.IsNaN(value))
262                     {
263                         _lastValue = value;
264                     }
265                 }
266                 else
267                 {
268                     _lastValue = value;
269                     _hasValue = true;
270                 }
271             }
272 
OnError(Exception error)273             public void OnError(Exception error)
274             {
275                 base._observer.OnError(error);
276                 base.Dispose();
277             }
278 
OnCompleted()279             public void OnCompleted()
280             {
281                 if (!_hasValue)
282                 {
283                     base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
284                 }
285                 else
286                 {
287                     base._observer.OnNext(_lastValue);
288                     base._observer.OnCompleted();
289                 }
290 
291                 base.Dispose();
292             }
293         }
294     }
295 
296     class MaxDecimal : Producer<decimal>
297     {
298         private readonly IObservable<decimal> _source;
299 
MaxDecimal(IObservable<decimal> source)300         public MaxDecimal(IObservable<decimal> source)
301         {
302             _source = source;
303         }
304 
Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)305         protected override IDisposable Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)
306         {
307             var sink = new _(observer, cancel);
308             setSink(sink);
309             return _source.SubscribeSafe(sink);
310         }
311 
312         class _ : Sink<decimal>, IObserver<decimal>
313         {
314             private bool _hasValue;
315             private decimal _lastValue;
316 
_(IObserver<decimal> observer, IDisposable cancel)317             public _(IObserver<decimal> observer, IDisposable cancel)
318                 : base(observer, cancel)
319             {
320                 _hasValue = false;
321                 _lastValue = default(decimal);
322             }
323 
OnNext(decimal value)324             public void OnNext(decimal value)
325             {
326                 if (_hasValue)
327                 {
328                     if (value > _lastValue)
329                     {
330                         _lastValue = value;
331                     }
332                 }
333                 else
334                 {
335                     _lastValue = value;
336                     _hasValue = true;
337                 }
338             }
339 
OnError(Exception error)340             public void OnError(Exception error)
341             {
342                 base._observer.OnError(error);
343                 base.Dispose();
344             }
345 
OnCompleted()346             public void OnCompleted()
347             {
348                 if (!_hasValue)
349                 {
350                     base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
351                 }
352                 else
353                 {
354                     base._observer.OnNext(_lastValue);
355                     base._observer.OnCompleted();
356                 }
357 
358                 base.Dispose();
359             }
360         }
361     }
362 
363     class MaxInt32 : Producer<int>
364     {
365         private readonly IObservable<int> _source;
366 
MaxInt32(IObservable<int> source)367         public MaxInt32(IObservable<int> source)
368         {
369             _source = source;
370         }
371 
Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)372         protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)
373         {
374             var sink = new _(observer, cancel);
375             setSink(sink);
376             return _source.SubscribeSafe(sink);
377         }
378 
379         class _ : Sink<int>, IObserver<int>
380         {
381             private bool _hasValue;
382             private int _lastValue;
383 
_(IObserver<int> observer, IDisposable cancel)384             public _(IObserver<int> observer, IDisposable cancel)
385                 : base(observer, cancel)
386             {
387                 _hasValue = false;
388                 _lastValue = default(int);
389             }
390 
OnNext(int value)391             public void OnNext(int value)
392             {
393                 if (_hasValue)
394                 {
395                     if (value > _lastValue)
396                     {
397                         _lastValue = value;
398                     }
399                 }
400                 else
401                 {
402                     _lastValue = value;
403                     _hasValue = true;
404                 }
405             }
406 
OnError(Exception error)407             public void OnError(Exception error)
408             {
409                 base._observer.OnError(error);
410                 base.Dispose();
411             }
412 
OnCompleted()413             public void OnCompleted()
414             {
415                 if (!_hasValue)
416                 {
417                     base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
418                 }
419                 else
420                 {
421                     base._observer.OnNext(_lastValue);
422                     base._observer.OnCompleted();
423                 }
424 
425                 base.Dispose();
426             }
427         }
428     }
429 
430     class MaxInt64 : Producer<long>
431     {
432         private readonly IObservable<long> _source;
433 
MaxInt64(IObservable<long> source)434         public MaxInt64(IObservable<long> source)
435         {
436             _source = source;
437         }
438 
Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)439         protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
440         {
441             var sink = new _(observer, cancel);
442             setSink(sink);
443             return _source.SubscribeSafe(sink);
444         }
445 
446         class _ : Sink<long>, IObserver<long>
447         {
448             private bool _hasValue;
449             private long _lastValue;
450 
_(IObserver<long> observer, IDisposable cancel)451             public _(IObserver<long> observer, IDisposable cancel)
452                 : base(observer, cancel)
453             {
454                 _hasValue = false;
455                 _lastValue = default(long);
456             }
457 
OnNext(long value)458             public void OnNext(long value)
459             {
460                 if (_hasValue)
461                 {
462                     if (value > _lastValue)
463                     {
464                         _lastValue = value;
465                     }
466                 }
467                 else
468                 {
469                     _lastValue = value;
470                     _hasValue = true;
471                 }
472             }
473 
OnError(Exception error)474             public void OnError(Exception error)
475             {
476                 base._observer.OnError(error);
477                 base.Dispose();
478             }
479 
OnCompleted()480             public void OnCompleted()
481             {
482                 if (!_hasValue)
483                 {
484                     base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
485                 }
486                 else
487                 {
488                     base._observer.OnNext(_lastValue);
489                     base._observer.OnCompleted();
490                 }
491 
492                 base.Dispose();
493             }
494         }
495     }
496 
497     class MaxDoubleNullable : Producer<double?>
498     {
499         private readonly IObservable<double?> _source;
500 
MaxDoubleNullable(IObservable<double?> source)501         public MaxDoubleNullable(IObservable<double?> source)
502         {
503             _source = source;
504         }
505 
Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)506         protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)
507         {
508             var sink = new _(observer, cancel);
509             setSink(sink);
510             return _source.SubscribeSafe(sink);
511         }
512 
513         class _ : Sink<double?>, IObserver<double?>
514         {
515             private double? _lastValue;
516 
_(IObserver<double?> observer, IDisposable cancel)517             public _(IObserver<double?> observer, IDisposable cancel)
518                 : base(observer, cancel)
519             {
520                 _lastValue = default(double?);
521             }
522 
OnNext(double? value)523             public void OnNext(double? value)
524             {
525                 if (!value.HasValue)
526                     return;
527 
528                 if (_lastValue.HasValue)
529                 {
530                     if (value > _lastValue || double.IsNaN((double)value))
531                     {
532                         _lastValue = value;
533                     }
534                 }
535                 else
536                 {
537                     _lastValue = value;
538                 }
539             }
540 
OnError(Exception error)541             public void OnError(Exception error)
542             {
543                 base._observer.OnError(error);
544                 base.Dispose();
545             }
546 
OnCompleted()547             public void OnCompleted()
548             {
549                 base._observer.OnNext(_lastValue);
550                 base._observer.OnCompleted();
551                 base.Dispose();
552             }
553         }
554     }
555 
556     class MaxSingleNullable : Producer<float?>
557     {
558         private readonly IObservable<float?> _source;
559 
MaxSingleNullable(IObservable<float?> source)560         public MaxSingleNullable(IObservable<float?> source)
561         {
562             _source = source;
563         }
564 
Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)565         protected override IDisposable Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)
566         {
567             var sink = new _(observer, cancel);
568             setSink(sink);
569             return _source.SubscribeSafe(sink);
570         }
571 
572         class _ : Sink<float?>, IObserver<float?>
573         {
574             private float? _lastValue;
575 
_(IObserver<float?> observer, IDisposable cancel)576             public _(IObserver<float?> observer, IDisposable cancel)
577                 : base(observer, cancel)
578             {
579                 _lastValue = default(float?);
580             }
581 
OnNext(float? value)582             public void OnNext(float? value)
583             {
584                 if (!value.HasValue)
585                     return;
586 
587                 if (_lastValue.HasValue)
588                 {
589                     if (value > _lastValue || float.IsNaN((float)value))
590                     {
591                         _lastValue = value;
592                     }
593                 }
594                 else
595                 {
596                     _lastValue = value;
597                 }
598             }
599 
OnError(Exception error)600             public void OnError(Exception error)
601             {
602                 base._observer.OnError(error);
603                 base.Dispose();
604             }
605 
OnCompleted()606             public void OnCompleted()
607             {
608                 base._observer.OnNext(_lastValue);
609                 base._observer.OnCompleted();
610                 base.Dispose();
611             }
612         }
613     }
614 
615     class MaxDecimalNullable : Producer<decimal?>
616     {
617         private readonly IObservable<decimal?> _source;
618 
MaxDecimalNullable(IObservable<decimal?> source)619         public MaxDecimalNullable(IObservable<decimal?> source)
620         {
621             _source = source;
622         }
623 
Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)624         protected override IDisposable Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)
625         {
626             var sink = new _(observer, cancel);
627             setSink(sink);
628             return _source.SubscribeSafe(sink);
629         }
630 
631         class _ : Sink<decimal?>, IObserver<decimal?>
632         {
633             private decimal? _lastValue;
634 
_(IObserver<decimal?> observer, IDisposable cancel)635             public _(IObserver<decimal?> observer, IDisposable cancel)
636                 : base(observer, cancel)
637             {
638                 _lastValue = default(decimal?);
639             }
640 
641             public void OnNext(decimal? value)
642             {
643                 if (!value.HasValue)
644                     return;
645 
646                 if (_lastValue.HasValue)
647                 {
648                     if (value > _lastValue)
649                     {
650                         _lastValue = value;
651                     }
652                 }
653                 else
654                 {
655                     _lastValue = value;
656                 }
657             }
658 
OnError(Exception error)659             public void OnError(Exception error)
660             {
661                 base._observer.OnError(error);
662                 base.Dispose();
663             }
664 
OnCompleted()665             public void OnCompleted()
666             {
667                 base._observer.OnNext(_lastValue);
668                 base._observer.OnCompleted();
669                 base.Dispose();
670             }
671         }
672     }
673 
674     class MaxInt32Nullable : Producer<int?>
675     {
676         private readonly IObservable<int?> _source;
677 
MaxInt32Nullable(IObservable<int?> source)678         public MaxInt32Nullable(IObservable<int?> source)
679         {
680             _source = source;
681         }
682 
Run(IObserver<int?> observer, IDisposable cancel, Action<IDisposable> setSink)683         protected override IDisposable Run(IObserver<int?> observer, IDisposable cancel, Action<IDisposable> setSink)
684         {
685             var sink = new _(observer, cancel);
686             setSink(sink);
687             return _source.SubscribeSafe(sink);
688         }
689 
690         class _ : Sink<int?>, IObserver<int?>
691         {
692             private int? _lastValue;
693 
_(IObserver<int?> observer, IDisposable cancel)694             public _(IObserver<int?> observer, IDisposable cancel)
695                 : base(observer, cancel)
696             {
697                 _lastValue = default(int?);
698             }
699 
OnNext(int? value)700             public void OnNext(int? value)
701             {
702                 if (!value.HasValue)
703                     return;
704 
705                 if (_lastValue.HasValue)
706                 {
707                     if (value > _lastValue)
708                     {
709                         _lastValue = value;
710                     }
711                 }
712                 else
713                 {
714                     _lastValue = value;
715                 }
716             }
717 
OnError(Exception error)718             public void OnError(Exception error)
719             {
720                 base._observer.OnError(error);
721                 base.Dispose();
722             }
723 
OnCompleted()724             public void OnCompleted()
725             {
726                 base._observer.OnNext(_lastValue);
727                 base._observer.OnCompleted();
728                 base.Dispose();
729             }
730         }
731     }
732 
733     class MaxInt64Nullable : Producer<long?>
734     {
735         private readonly IObservable<long?> _source;
736 
MaxInt64Nullable(IObservable<long?> source)737         public MaxInt64Nullable(IObservable<long?> source)
738         {
739             _source = source;
740         }
741 
Run(IObserver<long?> observer, IDisposable cancel, Action<IDisposable> setSink)742         protected override IDisposable Run(IObserver<long?> observer, IDisposable cancel, Action<IDisposable> setSink)
743         {
744             var sink = new _(observer, cancel);
745             setSink(sink);
746             return _source.SubscribeSafe(sink);
747         }
748 
749         class _ : Sink<long?>, IObserver<long?>
750         {
751             private long? _lastValue;
752 
_(IObserver<long?> observer, IDisposable cancel)753             public _(IObserver<long?> observer, IDisposable cancel)
754                 : base(observer, cancel)
755             {
756                 _lastValue = default(long?);
757             }
758 
OnNext(long? value)759             public void OnNext(long? value)
760             {
761                 if (!value.HasValue)
762                     return;
763 
764                 if (_lastValue.HasValue)
765                 {
766                     if (value > _lastValue)
767                     {
768                         _lastValue = value;
769                     }
770                 }
771                 else
772                 {
773                     _lastValue = value;
774                 }
775             }
776 
OnError(Exception error)777             public void OnError(Exception error)
778             {
779                 base._observer.OnError(error);
780                 base.Dispose();
781             }
782 
OnCompleted()783             public void OnCompleted()
784             {
785                 base._observer.OnNext(_lastValue);
786                 base._observer.OnCompleted();
787                 base.Dispose();
788             }
789         }
790     }
791 }
792 #endif