1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #if !NO_PERF 4 using System; 5 6 namespace System.Reactive.Linq.ObservableImpl 7 { 8 class SumDouble : Producer<double> 9 { 10 private readonly IObservable<double> _source; 11 SumDouble(IObservable<double> source)12 public SumDouble(IObservable<double> source) 13 { 14 _source = source; 15 } 16 Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)17 protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink) 18 { 19 var sink = new _(observer, cancel); 20 setSink(sink); 21 return _source.SubscribeSafe(sink); 22 } 23 24 class _ : Sink<double>, IObserver<double> 25 { 26 private double _sum; 27 _(IObserver<double> observer, IDisposable cancel)28 public _(IObserver<double> observer, IDisposable cancel) 29 : base(observer, cancel) 30 { 31 _sum = 0.0; 32 } 33 OnNext(double value)34 public void OnNext(double value) 35 { 36 _sum += value; 37 } 38 OnError(Exception error)39 public void OnError(Exception error) 40 { 41 base._observer.OnError(error); 42 base.Dispose(); 43 } 44 OnCompleted()45 public void OnCompleted() 46 { 47 base._observer.OnNext(_sum); 48 base._observer.OnCompleted(); 49 base.Dispose(); 50 } 51 } 52 } 53 54 class SumSingle : Producer<float> 55 { 56 private readonly IObservable<float> _source; 57 SumSingle(IObservable<float> source)58 public SumSingle(IObservable<float> source) 59 { 60 _source = source; 61 } 62 Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)63 protected override IDisposable Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink) 64 { 65 var sink = new _(observer, cancel); 66 setSink(sink); 67 return _source.SubscribeSafe(sink); 68 } 69 70 class _ : Sink<float>, IObserver<float> 71 { 72 private double _sum; // This is what LINQ to Objects does! 73 _(IObserver<float> observer, IDisposable cancel)74 public _(IObserver<float> observer, IDisposable cancel) 75 : base(observer, cancel) 76 { 77 _sum = 0.0; // This is what LINQ to Objects does! 78 } 79 OnNext(float value)80 public void OnNext(float value) 81 { 82 _sum += value; // This is what LINQ to Objects does! 83 } 84 OnError(Exception error)85 public void OnError(Exception error) 86 { 87 base._observer.OnError(error); 88 base.Dispose(); 89 } 90 OnCompleted()91 public void OnCompleted() 92 { 93 base._observer.OnNext((float)_sum); // This is what LINQ to Objects does! 94 base._observer.OnCompleted(); 95 base.Dispose(); 96 } 97 } 98 } 99 100 class SumDecimal : Producer<decimal> 101 { 102 private readonly IObservable<decimal> _source; 103 SumDecimal(IObservable<decimal> source)104 public SumDecimal(IObservable<decimal> source) 105 { 106 _source = source; 107 } 108 Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)109 protected override IDisposable Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink) 110 { 111 var sink = new _(observer, cancel); 112 setSink(sink); 113 return _source.SubscribeSafe(sink); 114 } 115 116 class _ : Sink<decimal>, IObserver<decimal> 117 { 118 private decimal _sum; 119 _(IObserver<decimal> observer, IDisposable cancel)120 public _(IObserver<decimal> observer, IDisposable cancel) 121 : base(observer, cancel) 122 { 123 _sum = 0M; 124 } 125 OnNext(decimal value)126 public void OnNext(decimal value) 127 { 128 _sum += value; 129 } 130 OnError(Exception error)131 public void OnError(Exception error) 132 { 133 base._observer.OnError(error); 134 base.Dispose(); 135 } 136 OnCompleted()137 public void OnCompleted() 138 { 139 base._observer.OnNext(_sum); 140 base._observer.OnCompleted(); 141 base.Dispose(); 142 } 143 } 144 } 145 146 class SumInt32 : Producer<int> 147 { 148 private readonly IObservable<int> _source; 149 SumInt32(IObservable<int> source)150 public SumInt32(IObservable<int> source) 151 { 152 _source = source; 153 } 154 Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)155 protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink) 156 { 157 var sink = new _(observer, cancel); 158 setSink(sink); 159 return _source.SubscribeSafe(sink); 160 } 161 162 class _ : Sink<int>, IObserver<int> 163 { 164 private int _sum; 165 _(IObserver<int> observer, IDisposable cancel)166 public _(IObserver<int> observer, IDisposable cancel) 167 : base(observer, cancel) 168 { 169 _sum = 0; 170 } 171 OnNext(int value)172 public void OnNext(int value) 173 { 174 try 175 { 176 checked 177 { 178 _sum += value; 179 } 180 } 181 catch (Exception exception) 182 { 183 base._observer.OnError(exception); 184 base.Dispose(); 185 } 186 } 187 OnError(Exception error)188 public void OnError(Exception error) 189 { 190 base._observer.OnError(error); 191 base.Dispose(); 192 } 193 OnCompleted()194 public void OnCompleted() 195 { 196 base._observer.OnNext(_sum); 197 base._observer.OnCompleted(); 198 base.Dispose(); 199 } 200 } 201 } 202 203 class SumInt64 : Producer<long> 204 { 205 private readonly IObservable<long> _source; 206 SumInt64(IObservable<long> source)207 public SumInt64(IObservable<long> source) 208 { 209 _source = source; 210 } 211 Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)212 protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink) 213 { 214 var sink = new _(observer, cancel); 215 setSink(sink); 216 return _source.SubscribeSafe(sink); 217 } 218 219 class _ : Sink<long>, IObserver<long> 220 { 221 private long _sum; 222 _(IObserver<long> observer, IDisposable cancel)223 public _(IObserver<long> observer, IDisposable cancel) 224 : base(observer, cancel) 225 { 226 _sum = 0L; 227 } 228 OnNext(long value)229 public void OnNext(long value) 230 { 231 try 232 { 233 checked 234 { 235 _sum += value; 236 } 237 } 238 catch (Exception exception) 239 { 240 base._observer.OnError(exception); 241 base.Dispose(); 242 } 243 } 244 OnError(Exception error)245 public void OnError(Exception error) 246 { 247 base._observer.OnError(error); 248 base.Dispose(); 249 } 250 OnCompleted()251 public void OnCompleted() 252 { 253 base._observer.OnNext(_sum); 254 base._observer.OnCompleted(); 255 base.Dispose(); 256 } 257 } 258 } 259 260 class SumDoubleNullable : Producer<double?> 261 { 262 private readonly IObservable<double?> _source; 263 SumDoubleNullable(IObservable<double?> source)264 public SumDoubleNullable(IObservable<double?> source) 265 { 266 _source = source; 267 } 268 Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)269 protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink) 270 { 271 var sink = new _(observer, cancel); 272 setSink(sink); 273 return _source.SubscribeSafe(sink); 274 } 275 276 class _ : Sink<double?>, IObserver<double?> 277 { 278 private double _sum; 279 _(IObserver<double?> observer, IDisposable cancel)280 public _(IObserver<double?> observer, IDisposable cancel) 281 : base(observer, cancel) 282 { 283 _sum = 0.0; 284 } 285 OnNext(double? value)286 public void OnNext(double? value) 287 { 288 if (value != null) 289 _sum += value.Value; 290 } 291 OnError(Exception error)292 public void OnError(Exception error) 293 { 294 base._observer.OnError(error); 295 base.Dispose(); 296 } 297 OnCompleted()298 public void OnCompleted() 299 { 300 base._observer.OnNext(_sum); 301 base._observer.OnCompleted(); 302 base.Dispose(); 303 } 304 } 305 } 306 307 class SumSingleNullable : Producer<float?> 308 { 309 private readonly IObservable<float?> _source; 310 SumSingleNullable(IObservable<float?> source)311 public SumSingleNullable(IObservable<float?> source) 312 { 313 _source = source; 314 } 315 Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)316 protected override IDisposable Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink) 317 { 318 var sink = new _(observer, cancel); 319 setSink(sink); 320 return _source.SubscribeSafe(sink); 321 } 322 323 class _ : Sink<float?>, IObserver<float?> 324 { 325 private double _sum; // This is what LINQ to Objects does! 326 _(IObserver<float?> observer, IDisposable cancel)327 public _(IObserver<float?> observer, IDisposable cancel) 328 : base(observer, cancel) 329 { 330 _sum = 0.0; // This is what LINQ to Objects does! 331 } 332 OnNext(float? value)333 public void OnNext(float? value) 334 { 335 if (value != null) 336 _sum += value.Value; // This is what LINQ to Objects does! 337 } 338 OnError(Exception error)339 public void OnError(Exception error) 340 { 341 base._observer.OnError(error); 342 base.Dispose(); 343 } 344 OnCompleted()345 public void OnCompleted() 346 { 347 base._observer.OnNext((float)_sum); // This is what LINQ to Objects does! 348 base._observer.OnCompleted(); 349 base.Dispose(); 350 } 351 } 352 } 353 354 class SumDecimalNullable : Producer<decimal?> 355 { 356 private readonly IObservable<decimal?> _source; 357 SumDecimalNullable(IObservable<decimal?> source)358 public SumDecimalNullable(IObservable<decimal?> source) 359 { 360 _source = source; 361 } 362 Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)363 protected override IDisposable Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink) 364 { 365 var sink = new _(observer, cancel); 366 setSink(sink); 367 return _source.SubscribeSafe(sink); 368 } 369 370 class _ : Sink<decimal?>, IObserver<decimal?> 371 { 372 private decimal _sum; 373 _(IObserver<decimal?> observer, IDisposable cancel)374 public _(IObserver<decimal?> observer, IDisposable cancel) 375 : base(observer, cancel) 376 { 377 _sum = 0M; 378 } 379 380 public void OnNext(decimal? value) 381 { 382 if (value != null) 383 _sum += value.Value; 384 } 385 OnError(Exception error)386 public void OnError(Exception error) 387 { 388 base._observer.OnError(error); 389 base.Dispose(); 390 } 391 OnCompleted()392 public void OnCompleted() 393 { 394 base._observer.OnNext(_sum); 395 base._observer.OnCompleted(); 396 base.Dispose(); 397 } 398 } 399 } 400 401 class SumInt32Nullable : Producer<int?> 402 { 403 private readonly IObservable<int?> _source; 404 SumInt32Nullable(IObservable<int?> source)405 public SumInt32Nullable(IObservable<int?> source) 406 { 407 _source = source; 408 } 409 Run(IObserver<int?> observer, IDisposable cancel, Action<IDisposable> setSink)410 protected override IDisposable Run(IObserver<int?> observer, IDisposable cancel, Action<IDisposable> setSink) 411 { 412 var sink = new _(observer, cancel); 413 setSink(sink); 414 return _source.SubscribeSafe(sink); 415 } 416 417 class _ : Sink<int?>, IObserver<int?> 418 { 419 private int _sum; 420 _(IObserver<int?> observer, IDisposable cancel)421 public _(IObserver<int?> observer, IDisposable cancel) 422 : base(observer, cancel) 423 { 424 _sum = 0; 425 } 426 OnNext(int? value)427 public void OnNext(int? value) 428 { 429 try 430 { 431 checked 432 { 433 if (value != null) 434 _sum += value.Value; 435 } 436 } 437 catch (Exception exception) 438 { 439 base._observer.OnError(exception); 440 base.Dispose(); 441 } 442 } 443 OnError(Exception error)444 public void OnError(Exception error) 445 { 446 base._observer.OnError(error); 447 base.Dispose(); 448 } 449 OnCompleted()450 public void OnCompleted() 451 { 452 base._observer.OnNext(_sum); 453 base._observer.OnCompleted(); 454 base.Dispose(); 455 } 456 } 457 } 458 459 class SumInt64Nullable : Producer<long?> 460 { 461 private readonly IObservable<long?> _source; 462 SumInt64Nullable(IObservable<long?> source)463 public SumInt64Nullable(IObservable<long?> source) 464 { 465 _source = source; 466 } 467 Run(IObserver<long?> observer, IDisposable cancel, Action<IDisposable> setSink)468 protected override IDisposable Run(IObserver<long?> observer, IDisposable cancel, Action<IDisposable> setSink) 469 { 470 var sink = new _(observer, cancel); 471 setSink(sink); 472 return _source.SubscribeSafe(sink); 473 } 474 475 class _ : Sink<long?>, IObserver<long?> 476 { 477 private long _sum; 478 _(IObserver<long?> observer, IDisposable cancel)479 public _(IObserver<long?> observer, IDisposable cancel) 480 : base(observer, cancel) 481 { 482 _sum = 0L; 483 } 484 OnNext(long? value)485 public void OnNext(long? value) 486 { 487 try 488 { 489 checked 490 { 491 if (value != null) 492 _sum += value.Value; 493 } 494 } 495 catch (Exception exception) 496 { 497 base._observer.OnError(exception); 498 base.Dispose(); 499 } 500 } 501 OnError(Exception error)502 public void OnError(Exception error) 503 { 504 base._observer.OnError(error); 505 base.Dispose(); 506 } 507 OnCompleted()508 public void OnCompleted() 509 { 510 base._observer.OnNext(_sum); 511 base._observer.OnCompleted(); 512 base.Dispose(); 513 } 514 } 515 } 516 } 517 #endif