1from rx.core import Observer, Observable, AnonymousObservable, Disposable
2from rx.internal import extensionmethod
3from rx.disposables import CompositeDisposable
4
5
6@extensionmethod(Observable, alias="tap")
7def do_action(self, on_next=None, on_error=None, on_completed=None,
8              observer=None):
9    """Invokes an action for each element in the observable sequence and
10    invokes an action on graceful or exceptional termination of the
11    observable sequence. This method can be used for debugging, logging,
12    etc. of query behavior by intercepting the message stream to run
13    arbitrary actions for messages on the pipeline.
14
15    1 - observable.do_action(observer)
16    2 - observable.do_action(on_next)
17    3 - observable.do_action(on_next, on_error)
18    4 - observable.do_action(on_next, on_error, on_completed)
19
20    observer -- [Optional] Observer, or ...
21    on_next -- [Optional] Action to invoke for each element in the
22        observable sequence.
23    on_error -- [Optional] Action to invoke on exceptional termination
24        of the observable sequence.
25    on_completed -- [Optional] Action to invoke on graceful termination
26        of the observable sequence.
27
28    Returns the source sequence with the side-effecting behavior applied.
29    """
30
31    source = self
32
33    if isinstance(observer, Observer):
34        on_next = observer.on_next
35        on_error = observer.on_error
36        on_completed = observer.on_completed
37    elif isinstance(on_next, Observer):
38        on_error = on_next.on_error
39        on_completed = on_next.on_completed
40        on_next = on_next.on_next
41
42    def subscribe(observer):
43        def _on_next(x):
44            if not on_next:
45                observer.on_next(x)
46            else:
47                try:
48                    on_next(x)
49                except Exception as e:
50                    observer.on_error(e)
51
52                observer.on_next(x)
53
54        def _on_error(exception):
55            if not on_error:
56                observer.on_error(exception)
57            else:
58                try:
59                    on_error(exception)
60                except Exception as e:
61                    observer.on_error(e)
62
63                observer.on_error(exception)
64
65        def _on_completed():
66            if not on_completed:
67                observer.on_completed()
68            else:
69                try:
70                    on_completed()
71                except Exception as e:
72                    observer.on_error(e)
73
74                observer.on_completed()
75
76        return source.subscribe(_on_next, _on_error, _on_completed)
77
78    return AnonymousObservable(subscribe)
79
80
81@extensionmethod(Observable)
82def do_after_next(self, after_next):
83    """Invokes an action with each element after it has been emitted downstream.
84    This can be helpful for debugging, logging, and other side effects.
85
86    after_next -- Action to invoke on each element after it has been emitted
87    """
88
89    def subscribe(observer):
90
91        def on_next(value):
92            try:
93                observer.on_next(value)
94                after_next(value)
95            except Exception as e:
96                observer.on_error(e)
97
98        return self.subscribe(on_next, observer.on_error, observer.on_completed)
99
100    return AnonymousObservable(subscribe)
101
102
103@extensionmethod(Observable)
104def do_on_subscribe(self, on_subscribe):
105    """Invokes an action on subscription.
106    This can be helpful for debugging, logging, and other side effects on the start of an operation.
107
108    on_subscribe -- Action to invoke on subscription
109    """
110    def subscribe(observer):
111        on_subscribe()
112        return self.subscribe(observer.on_next, observer.on_error, observer.on_completed)
113
114    return AnonymousObservable(subscribe)
115
116
117@extensionmethod(Observable)
118def do_on_dispose(self, on_dispose):
119    """Invokes an action on disposal.
120     This can be helpful for debugging, logging, and other side effects on the disposal of an operation.
121
122
123    on_dispose -- Action to invoke on disposal
124    """
125
126    class OnDispose(Disposable):
127        def dispose(self):
128            on_dispose()
129
130    def subscribe(observer):
131        composite_disposable = CompositeDisposable()
132        composite_disposable.add(OnDispose())
133        disposable = self.subscribe(observer.on_next, observer.on_error, observer.on_completed)
134        composite_disposable.add(disposable)
135        return composite_disposable
136
137    return AnonymousObservable(subscribe)
138
139
140@extensionmethod(Observable)
141def do_on_terminate(self, on_terminate):
142    """Invokes an action on an on_complete() or on_error() event.
143     This can be helpful for debugging, logging, and other side effects when completion or an error terminates an operation.
144
145
146    on_terminate -- Action to invoke when on_complete or on_error is called
147    """
148
149    def subscribe(observer):
150
151        def on_completed():
152            try:
153                on_terminate()
154            except Exception as err:
155                observer.on_error(err)
156            else:
157                observer.on_completed()
158
159        def on_error(exception):
160            try:
161                on_terminate()
162            except Exception as err:
163                observer.on_error(err)
164            else:
165                observer.on_error(exception)
166
167        return self.subscribe(observer.on_next, on_error, on_completed)
168
169    return AnonymousObservable(subscribe)
170
171
172@extensionmethod(Observable)
173def do_after_terminate(self, after_terminate):
174    """Invokes an action after an on_complete() or on_error() event.
175     This can be helpful for debugging, logging, and other side effects when completion or an error terminates an operation
176
177
178    on_terminate -- Action to invoke after on_complete or on_error is called
179    """
180    def subscribe(observer):
181
182        def on_completed():
183            observer.on_completed()
184            try:
185                after_terminate()
186            except Exception as err:
187                observer.on_error(err)
188
189        def on_error(exception):
190            observer.on_error(exception)
191            try:
192                after_terminate()
193            except Exception as err:
194                observer.on_error(err)
195
196        return self.subscribe(observer.on_next, on_error, on_completed)
197
198    return AnonymousObservable(subscribe)
199
200
201@extensionmethod(Observable)
202def do_finally(self, finally_action):
203    """Invokes an action after an on_complete(), on_error(), or disposal event occurs
204     This can be helpful for debugging, logging, and other side effects when completion, an error, or disposal terminates an operation.
205    Note this operator will strive to execute the finally_action once, and prevent any redudant calls
206
207    finally_action -- Action to invoke after on_complete, on_error, or disposal is called
208    """
209
210    class OnDispose(Disposable):
211        def __init__(self, was_invoked):
212            self.was_invoked = was_invoked
213
214        def dispose(self):
215            if not self.was_invoked[0]:
216                finally_action()
217                self.was_invoked[0] = True
218
219    def subscribe(observer):
220
221        was_invoked = [False]
222
223        def on_completed():
224            observer.on_completed()
225            try:
226                if not was_invoked[0]:
227                    finally_action()
228                    was_invoked[0] = True
229            except Exception as err:
230                observer.on_error(err)
231
232        def on_error(exception):
233            observer.on_error(exception)
234            try:
235                if not was_invoked[0]:
236                    finally_action()
237                    was_invoked[0] = True
238            except Exception as err:
239                observer.on_error(err)
240
241        composite_disposable = CompositeDisposable()
242        composite_disposable.add(OnDispose(was_invoked))
243        disposable = self.subscribe(observer.on_next, on_error, on_completed)
244        composite_disposable.add(disposable)
245
246        return composite_disposable
247
248    return AnonymousObservable(subscribe)
249
250