1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #if !NO_PERF 4 using System; 5 using System.Collections.Generic; 6 using System.Reactive.Disposables; 7 8 #if !NO_TPL 9 using System.Threading; 10 using System.Threading.Tasks; 11 #endif 12 13 namespace System.Reactive.Linq.ObservableImpl 14 { 15 class Merge<TSource> : Producer<TSource> 16 { 17 private readonly IObservable<IObservable<TSource>> _sources; 18 private readonly int _maxConcurrent; 19 Merge(IObservable<IObservable<TSource>> sources)20 public Merge(IObservable<IObservable<TSource>> sources) 21 { 22 _sources = sources; 23 } 24 Merge(IObservable<IObservable<TSource>> sources, int maxConcurrent)25 public Merge(IObservable<IObservable<TSource>> sources, int maxConcurrent) 26 { 27 _sources = sources; 28 _maxConcurrent = maxConcurrent; 29 } 30 31 #if !NO_TPL 32 private readonly IObservable<Task<TSource>> _sourcesT; 33 Merge(IObservable<Task<TSource>> sources)34 public Merge(IObservable<Task<TSource>> sources) 35 { 36 _sourcesT = sources; 37 } 38 #endif 39 Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)40 protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink) 41 { 42 if (_maxConcurrent > 0) 43 { 44 var sink = new MergeConcurrent(this, observer, cancel); 45 setSink(sink); 46 return sink.Run(); 47 } 48 #if !NO_TPL 49 else if (_sourcesT != null) 50 { 51 var sink = new MergeImpl(this, observer, cancel); 52 setSink(sink); 53 return sink.Run(); 54 } 55 #endif 56 else 57 { 58 var sink = new _(this, observer, cancel); 59 setSink(sink); 60 return sink.Run(); 61 } 62 } 63 64 class _ : Sink<TSource>, IObserver<IObservable<TSource>> 65 { 66 private readonly Merge<TSource> _parent; 67 _(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)68 public _(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel) 69 : base(observer, cancel) 70 { 71 _parent = parent; 72 } 73 74 private object _gate; 75 private bool _isStopped; 76 private CompositeDisposable _group; 77 private SingleAssignmentDisposable _sourceSubscription; 78 Run()79 public IDisposable Run() 80 { 81 _gate = new object(); 82 _isStopped = false; 83 _group = new CompositeDisposable(); 84 85 _sourceSubscription = new SingleAssignmentDisposable(); 86 _group.Add(_sourceSubscription); 87 _sourceSubscription.Disposable = _parent._sources.SubscribeSafe(this); 88 89 return _group; 90 } 91 OnNext(IObservable<TSource> value)92 public void OnNext(IObservable<TSource> value) 93 { 94 var innerSubscription = new SingleAssignmentDisposable(); 95 _group.Add(innerSubscription); 96 innerSubscription.Disposable = value.SubscribeSafe(new Iter(this, innerSubscription)); 97 } 98 OnError(Exception error)99 public void OnError(Exception error) 100 { 101 lock (_gate) 102 { 103 base._observer.OnError(error); 104 base.Dispose(); 105 } 106 } 107 OnCompleted()108 public void OnCompleted() 109 { 110 _isStopped = true; 111 if (_group.Count == 1) 112 { 113 // 114 // Notice there can be a race between OnCompleted of the source and any 115 // of the inner sequences, where both see _group.Count == 1, and one is 116 // waiting for the lock. There won't be a double OnCompleted observation 117 // though, because the call to Dispose silences the observer by swapping 118 // in a NopObserver<T>. 119 // 120 lock (_gate) 121 { 122 base._observer.OnCompleted(); 123 base.Dispose(); 124 } 125 } 126 else 127 { 128 _sourceSubscription.Dispose(); 129 } 130 } 131 132 class Iter : IObserver<TSource> 133 { 134 private readonly _ _parent; 135 private readonly IDisposable _self; 136 Iter(_ parent, IDisposable self)137 public Iter(_ parent, IDisposable self) 138 { 139 _parent = parent; 140 _self = self; 141 } 142 OnNext(TSource value)143 public void OnNext(TSource value) 144 { 145 lock (_parent._gate) 146 _parent._observer.OnNext(value); 147 } 148 OnError(Exception error)149 public void OnError(Exception error) 150 { 151 lock (_parent._gate) 152 { 153 _parent._observer.OnError(error); 154 _parent.Dispose(); 155 } 156 } 157 OnCompleted()158 public void OnCompleted() 159 { 160 _parent._group.Remove(_self); 161 if (_parent._isStopped && _parent._group.Count == 1) 162 { 163 // 164 // Notice there can be a race between OnCompleted of the source and any 165 // of the inner sequences, where both see _group.Count == 1, and one is 166 // waiting for the lock. There won't be a double OnCompleted observation 167 // though, because the call to Dispose silences the observer by swapping 168 // in a NopObserver<T>. 169 // 170 lock (_parent._gate) 171 { 172 _parent._observer.OnCompleted(); 173 _parent.Dispose(); 174 } 175 } 176 } 177 } 178 } 179 180 class MergeConcurrent : Sink<TSource>, IObserver<IObservable<TSource>> 181 { 182 private readonly Merge<TSource> _parent; 183 MergeConcurrent(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)184 public MergeConcurrent(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel) 185 : base(observer, cancel) 186 { 187 _parent = parent; 188 } 189 190 private object _gate; 191 private Queue<IObservable<TSource>> _q; 192 private bool _isStopped; 193 private SingleAssignmentDisposable _sourceSubscription; 194 private CompositeDisposable _group; 195 private int _activeCount = 0; 196 Run()197 public IDisposable Run() 198 { 199 _gate = new object(); 200 _q = new Queue<IObservable<TSource>>(); 201 _isStopped = false; 202 _activeCount = 0; 203 204 _group = new CompositeDisposable(); 205 _sourceSubscription = new SingleAssignmentDisposable(); 206 _sourceSubscription.Disposable = _parent._sources.SubscribeSafe(this); 207 _group.Add(_sourceSubscription); 208 209 return _group; 210 } 211 OnNext(IObservable<TSource> value)212 public void OnNext(IObservable<TSource> value) 213 { 214 lock (_gate) 215 { 216 if (_activeCount < _parent._maxConcurrent) 217 { 218 _activeCount++; 219 Subscribe(value); 220 } 221 else 222 _q.Enqueue(value); 223 } 224 } 225 OnError(Exception error)226 public void OnError(Exception error) 227 { 228 lock (_gate) 229 { 230 base._observer.OnError(error); 231 base.Dispose(); 232 } 233 } 234 OnCompleted()235 public void OnCompleted() 236 { 237 lock (_gate) 238 { 239 _isStopped = true; 240 if (_activeCount == 0) 241 { 242 base._observer.OnCompleted(); 243 base.Dispose(); 244 } 245 else 246 { 247 _sourceSubscription.Dispose(); 248 } 249 } 250 } 251 Subscribe(IObservable<TSource> innerSource)252 private void Subscribe(IObservable<TSource> innerSource) 253 { 254 var subscription = new SingleAssignmentDisposable(); 255 _group.Add(subscription); 256 subscription.Disposable = innerSource.SubscribeSafe(new Iter(this, subscription)); 257 } 258 259 class Iter : IObserver<TSource> 260 { 261 private readonly MergeConcurrent _parent; 262 private readonly IDisposable _self; 263 Iter(MergeConcurrent parent, IDisposable self)264 public Iter(MergeConcurrent parent, IDisposable self) 265 { 266 _parent = parent; 267 _self = self; 268 } 269 OnNext(TSource value)270 public void OnNext(TSource value) 271 { 272 lock (_parent._gate) 273 _parent._observer.OnNext(value); 274 } 275 OnError(Exception error)276 public void OnError(Exception error) 277 { 278 lock (_parent._gate) 279 { 280 _parent._observer.OnError(error); 281 _parent.Dispose(); 282 } 283 } 284 OnCompleted()285 public void OnCompleted() 286 { 287 _parent._group.Remove(_self); 288 lock (_parent._gate) 289 { 290 if (_parent._q.Count > 0) 291 { 292 var s = _parent._q.Dequeue(); 293 _parent.Subscribe(s); 294 } 295 else 296 { 297 _parent._activeCount--; 298 if (_parent._isStopped && _parent._activeCount == 0) 299 { 300 _parent._observer.OnCompleted(); 301 _parent.Dispose(); 302 } 303 } 304 } 305 } 306 } 307 } 308 309 #if !NO_TPL 310 #pragma warning disable 0420 311 class MergeImpl : Sink<TSource>, IObserver<Task<TSource>> 312 { 313 private readonly Merge<TSource> _parent; 314 MergeImpl(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)315 public MergeImpl(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel) 316 : base(observer, cancel) 317 { 318 _parent = parent; 319 } 320 321 private object _gate; 322 private volatile int _count; 323 Run()324 public IDisposable Run() 325 { 326 _gate = new object(); 327 _count = 1; 328 329 return _parent._sourcesT.SubscribeSafe(this); 330 } 331 OnNext(Task<TSource> value)332 public void OnNext(Task<TSource> value) 333 { 334 Interlocked.Increment(ref _count); 335 if (value.IsCompleted) 336 { 337 OnCompletedTask(value); 338 } 339 else 340 { 341 value.ContinueWith(OnCompletedTask); 342 } 343 } 344 OnCompletedTask(Task<TSource> task)345 private void OnCompletedTask(Task<TSource> task) 346 { 347 switch (task.Status) 348 { 349 case TaskStatus.RanToCompletion: 350 { 351 lock (_gate) 352 base._observer.OnNext(task.Result); 353 354 OnCompleted(); 355 } 356 break; 357 case TaskStatus.Faulted: 358 { 359 lock (_gate) 360 { 361 base._observer.OnError(task.Exception.InnerException); 362 base.Dispose(); 363 } 364 } 365 break; 366 case TaskStatus.Canceled: 367 { 368 lock (_gate) 369 { 370 base._observer.OnError(new TaskCanceledException(task)); 371 base.Dispose(); 372 } 373 } 374 break; 375 } 376 } 377 OnError(Exception error)378 public void OnError(Exception error) 379 { 380 lock (_gate) 381 { 382 base._observer.OnError(error); 383 base.Dispose(); 384 } 385 } 386 OnCompleted()387 public void OnCompleted() 388 { 389 if (Interlocked.Decrement(ref _count) == 0) 390 { 391 lock (_gate) 392 { 393 base._observer.OnCompleted(); 394 base.Dispose(); 395 } 396 } 397 } 398 } 399 #pragma warning restore 0420 400 #endif 401 } 402 } 403 #endif