1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #if !NO_PERF
4 using System;
5 using System.Collections.Generic;
6 using System.Reactive.Disposables;
7 
8 #if !NO_TPL
9 using System.Threading;
10 using System.Threading.Tasks;
11 #endif
12 
13 namespace System.Reactive.Linq.ObservableImpl
14 {
15     class Merge<TSource> : Producer<TSource>
16     {
17         private readonly IObservable<IObservable<TSource>> _sources;
18         private readonly int _maxConcurrent;
19 
Merge(IObservable<IObservable<TSource>> sources)20         public Merge(IObservable<IObservable<TSource>> sources)
21         {
22             _sources = sources;
23         }
24 
Merge(IObservable<IObservable<TSource>> sources, int maxConcurrent)25         public Merge(IObservable<IObservable<TSource>> sources, int maxConcurrent)
26         {
27             _sources = sources;
28             _maxConcurrent = maxConcurrent;
29         }
30 
31 #if !NO_TPL
32         private readonly IObservable<Task<TSource>> _sourcesT;
33 
Merge(IObservable<Task<TSource>> sources)34         public Merge(IObservable<Task<TSource>> sources)
35         {
36             _sourcesT = sources;
37         }
38 #endif
39 
Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)40         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
41         {
42             if (_maxConcurrent > 0)
43             {
44                 var sink = new MergeConcurrent(this, observer, cancel);
45                 setSink(sink);
46                 return sink.Run();
47             }
48 #if !NO_TPL
49             else if (_sourcesT != null)
50             {
51                 var sink = new MergeImpl(this, observer, cancel);
52                 setSink(sink);
53                 return sink.Run();
54             }
55 #endif
56             else
57             {
58                 var sink = new _(this, observer, cancel);
59                 setSink(sink);
60                 return sink.Run();
61             }
62         }
63 
64         class _ : Sink<TSource>, IObserver<IObservable<TSource>>
65         {
66             private readonly Merge<TSource> _parent;
67 
_(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)68             public _(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
69                 : base(observer, cancel)
70             {
71                 _parent = parent;
72             }
73 
74             private object _gate;
75             private bool _isStopped;
76             private CompositeDisposable _group;
77             private SingleAssignmentDisposable _sourceSubscription;
78 
Run()79             public IDisposable Run()
80             {
81                 _gate = new object();
82                 _isStopped = false;
83                 _group = new CompositeDisposable();
84 
85                 _sourceSubscription = new SingleAssignmentDisposable();
86                 _group.Add(_sourceSubscription);
87                 _sourceSubscription.Disposable = _parent._sources.SubscribeSafe(this);
88 
89                 return _group;
90             }
91 
OnNext(IObservable<TSource> value)92             public void OnNext(IObservable<TSource> value)
93             {
94                 var innerSubscription = new SingleAssignmentDisposable();
95                 _group.Add(innerSubscription);
96                 innerSubscription.Disposable = value.SubscribeSafe(new Iter(this, innerSubscription));
97             }
98 
OnError(Exception error)99             public void OnError(Exception error)
100             {
101                 lock (_gate)
102                 {
103                     base._observer.OnError(error);
104                     base.Dispose();
105                 }
106             }
107 
OnCompleted()108             public void OnCompleted()
109             {
110                 _isStopped = true;
111                 if (_group.Count == 1)
112                 {
113                     //
114                     // Notice there can be a race between OnCompleted of the source and any
115                     // of the inner sequences, where both see _group.Count == 1, and one is
116                     // waiting for the lock. There won't be a double OnCompleted observation
117                     // though, because the call to Dispose silences the observer by swapping
118                     // in a NopObserver<T>.
119                     //
120                     lock (_gate)
121                     {
122                         base._observer.OnCompleted();
123                         base.Dispose();
124                     }
125                 }
126                 else
127                 {
128                     _sourceSubscription.Dispose();
129                 }
130             }
131 
132             class Iter : IObserver<TSource>
133             {
134                 private readonly _ _parent;
135                 private readonly IDisposable _self;
136 
Iter(_ parent, IDisposable self)137                 public Iter(_ parent, IDisposable self)
138                 {
139                     _parent = parent;
140                     _self = self;
141                 }
142 
OnNext(TSource value)143                 public void OnNext(TSource value)
144                 {
145                     lock (_parent._gate)
146                         _parent._observer.OnNext(value);
147                 }
148 
OnError(Exception error)149                 public void OnError(Exception error)
150                 {
151                     lock (_parent._gate)
152                     {
153                         _parent._observer.OnError(error);
154                         _parent.Dispose();
155                     }
156                 }
157 
OnCompleted()158                 public void OnCompleted()
159                 {
160                     _parent._group.Remove(_self);
161                     if (_parent._isStopped && _parent._group.Count == 1)
162                     {
163                         //
164                         // Notice there can be a race between OnCompleted of the source and any
165                         // of the inner sequences, where both see _group.Count == 1, and one is
166                         // waiting for the lock. There won't be a double OnCompleted observation
167                         // though, because the call to Dispose silences the observer by swapping
168                         // in a NopObserver<T>.
169                         //
170                         lock (_parent._gate)
171                         {
172                             _parent._observer.OnCompleted();
173                             _parent.Dispose();
174                         }
175                     }
176                 }
177             }
178         }
179 
180         class MergeConcurrent : Sink<TSource>, IObserver<IObservable<TSource>>
181         {
182             private readonly Merge<TSource> _parent;
183 
MergeConcurrent(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)184             public MergeConcurrent(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
185                 : base(observer, cancel)
186             {
187                 _parent = parent;
188             }
189 
190             private object _gate;
191             private Queue<IObservable<TSource>> _q;
192             private bool _isStopped;
193             private SingleAssignmentDisposable _sourceSubscription;
194             private CompositeDisposable _group;
195             private int _activeCount = 0;
196 
Run()197             public IDisposable Run()
198             {
199                 _gate = new object();
200                 _q = new Queue<IObservable<TSource>>();
201                 _isStopped = false;
202                 _activeCount = 0;
203 
204                 _group = new CompositeDisposable();
205                 _sourceSubscription = new SingleAssignmentDisposable();
206                 _sourceSubscription.Disposable = _parent._sources.SubscribeSafe(this);
207                 _group.Add(_sourceSubscription);
208 
209                 return _group;
210             }
211 
OnNext(IObservable<TSource> value)212             public void OnNext(IObservable<TSource> value)
213             {
214                 lock (_gate)
215                 {
216                     if (_activeCount < _parent._maxConcurrent)
217                     {
218                         _activeCount++;
219                         Subscribe(value);
220                     }
221                     else
222                         _q.Enqueue(value);
223                 }
224             }
225 
OnError(Exception error)226             public void OnError(Exception error)
227             {
228                 lock (_gate)
229                 {
230                     base._observer.OnError(error);
231                     base.Dispose();
232                 }
233             }
234 
OnCompleted()235             public void OnCompleted()
236             {
237                 lock (_gate)
238                 {
239                     _isStopped = true;
240                     if (_activeCount == 0)
241                     {
242                         base._observer.OnCompleted();
243                         base.Dispose();
244                     }
245                     else
246                     {
247                         _sourceSubscription.Dispose();
248                     }
249                 }
250             }
251 
Subscribe(IObservable<TSource> innerSource)252             private void Subscribe(IObservable<TSource> innerSource)
253             {
254                 var subscription = new SingleAssignmentDisposable();
255                 _group.Add(subscription);
256                 subscription.Disposable = innerSource.SubscribeSafe(new Iter(this, subscription));
257             }
258 
259             class Iter : IObserver<TSource>
260             {
261                 private readonly MergeConcurrent _parent;
262                 private readonly IDisposable _self;
263 
Iter(MergeConcurrent parent, IDisposable self)264                 public Iter(MergeConcurrent parent, IDisposable self)
265                 {
266                     _parent = parent;
267                     _self = self;
268                 }
269 
OnNext(TSource value)270                 public void OnNext(TSource value)
271                 {
272                     lock (_parent._gate)
273                         _parent._observer.OnNext(value);
274                 }
275 
OnError(Exception error)276                 public void OnError(Exception error)
277                 {
278                     lock (_parent._gate)
279                     {
280                         _parent._observer.OnError(error);
281                         _parent.Dispose();
282                     }
283                 }
284 
OnCompleted()285                 public void OnCompleted()
286                 {
287                     _parent._group.Remove(_self);
288                     lock (_parent._gate)
289                     {
290                         if (_parent._q.Count > 0)
291                         {
292                             var s = _parent._q.Dequeue();
293                             _parent.Subscribe(s);
294                         }
295                         else
296                         {
297                             _parent._activeCount--;
298                             if (_parent._isStopped && _parent._activeCount == 0)
299                             {
300                                 _parent._observer.OnCompleted();
301                                 _parent.Dispose();
302                             }
303                         }
304                     }
305                 }
306             }
307         }
308 
309 #if !NO_TPL
310 #pragma warning disable 0420
311         class MergeImpl : Sink<TSource>, IObserver<Task<TSource>>
312         {
313             private readonly Merge<TSource> _parent;
314 
MergeImpl(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)315             public MergeImpl(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
316                 : base(observer, cancel)
317             {
318                 _parent = parent;
319             }
320 
321             private object _gate;
322             private volatile int _count;
323 
Run()324             public IDisposable Run()
325             {
326                 _gate = new object();
327                 _count = 1;
328 
329                 return _parent._sourcesT.SubscribeSafe(this);
330             }
331 
OnNext(Task<TSource> value)332             public void OnNext(Task<TSource> value)
333             {
334                 Interlocked.Increment(ref _count);
335                 if (value.IsCompleted)
336                 {
337                     OnCompletedTask(value);
338                 }
339                 else
340                 {
341                     value.ContinueWith(OnCompletedTask);
342                 }
343             }
344 
OnCompletedTask(Task<TSource> task)345             private void OnCompletedTask(Task<TSource> task)
346             {
347                 switch (task.Status)
348                 {
349                     case TaskStatus.RanToCompletion:
350                         {
351                             lock (_gate)
352                                 base._observer.OnNext(task.Result);
353 
354                             OnCompleted();
355                         }
356                         break;
357                     case TaskStatus.Faulted:
358                         {
359                             lock (_gate)
360                             {
361                                 base._observer.OnError(task.Exception.InnerException);
362                                 base.Dispose();
363                             }
364                         }
365                         break;
366                     case TaskStatus.Canceled:
367                         {
368                             lock (_gate)
369                             {
370                                 base._observer.OnError(new TaskCanceledException(task));
371                                 base.Dispose();
372                             }
373                         }
374                         break;
375                 }
376             }
377 
OnError(Exception error)378             public void OnError(Exception error)
379             {
380                 lock (_gate)
381                 {
382                     base._observer.OnError(error);
383                     base.Dispose();
384                 }
385             }
386 
OnCompleted()387             public void OnCompleted()
388             {
389                 if (Interlocked.Decrement(ref _count) == 0)
390                 {
391                     lock (_gate)
392                     {
393                         base._observer.OnCompleted();
394                         base.Dispose();
395                     }
396                 }
397             }
398         }
399 #pragma warning restore 0420
400 #endif
401     }
402 }
403 #endif