1from rx.core import Observer, Observable, AnonymousObservable, Disposable 2from rx.internal import extensionmethod 3from rx.disposables import CompositeDisposable 4 5 6@extensionmethod(Observable, alias="tap") 7def do_action(self, on_next=None, on_error=None, on_completed=None, 8 observer=None): 9 """Invokes an action for each element in the observable sequence and 10 invokes an action on graceful or exceptional termination of the 11 observable sequence. This method can be used for debugging, logging, 12 etc. of query behavior by intercepting the message stream to run 13 arbitrary actions for messages on the pipeline. 14 15 1 - observable.do_action(observer) 16 2 - observable.do_action(on_next) 17 3 - observable.do_action(on_next, on_error) 18 4 - observable.do_action(on_next, on_error, on_completed) 19 20 observer -- [Optional] Observer, or ... 21 on_next -- [Optional] Action to invoke for each element in the 22 observable sequence. 23 on_error -- [Optional] Action to invoke on exceptional termination 24 of the observable sequence. 25 on_completed -- [Optional] Action to invoke on graceful termination 26 of the observable sequence. 27 28 Returns the source sequence with the side-effecting behavior applied. 29 """ 30 31 source = self 32 33 if isinstance(observer, Observer): 34 on_next = observer.on_next 35 on_error = observer.on_error 36 on_completed = observer.on_completed 37 elif isinstance(on_next, Observer): 38 on_error = on_next.on_error 39 on_completed = on_next.on_completed 40 on_next = on_next.on_next 41 42 def subscribe(observer): 43 def _on_next(x): 44 if not on_next: 45 observer.on_next(x) 46 else: 47 try: 48 on_next(x) 49 except Exception as e: 50 observer.on_error(e) 51 52 observer.on_next(x) 53 54 def _on_error(exception): 55 if not on_error: 56 observer.on_error(exception) 57 else: 58 try: 59 on_error(exception) 60 except Exception as e: 61 observer.on_error(e) 62 63 observer.on_error(exception) 64 65 def _on_completed(): 66 if not on_completed: 67 observer.on_completed() 68 else: 69 try: 70 on_completed() 71 except Exception as e: 72 observer.on_error(e) 73 74 observer.on_completed() 75 76 return source.subscribe(_on_next, _on_error, _on_completed) 77 78 return AnonymousObservable(subscribe) 79 80 81@extensionmethod(Observable) 82def do_after_next(self, after_next): 83 """Invokes an action with each element after it has been emitted downstream. 84 This can be helpful for debugging, logging, and other side effects. 85 86 after_next -- Action to invoke on each element after it has been emitted 87 """ 88 89 def subscribe(observer): 90 91 def on_next(value): 92 try: 93 observer.on_next(value) 94 after_next(value) 95 except Exception as e: 96 observer.on_error(e) 97 98 return self.subscribe(on_next, observer.on_error, observer.on_completed) 99 100 return AnonymousObservable(subscribe) 101 102 103@extensionmethod(Observable) 104def do_on_subscribe(self, on_subscribe): 105 """Invokes an action on subscription. 106 This can be helpful for debugging, logging, and other side effects on the start of an operation. 107 108 on_subscribe -- Action to invoke on subscription 109 """ 110 def subscribe(observer): 111 on_subscribe() 112 return self.subscribe(observer.on_next, observer.on_error, observer.on_completed) 113 114 return AnonymousObservable(subscribe) 115 116 117@extensionmethod(Observable) 118def do_on_dispose(self, on_dispose): 119 """Invokes an action on disposal. 120 This can be helpful for debugging, logging, and other side effects on the disposal of an operation. 121 122 123 on_dispose -- Action to invoke on disposal 124 """ 125 126 class OnDispose(Disposable): 127 def dispose(self): 128 on_dispose() 129 130 def subscribe(observer): 131 composite_disposable = CompositeDisposable() 132 composite_disposable.add(OnDispose()) 133 disposable = self.subscribe(observer.on_next, observer.on_error, observer.on_completed) 134 composite_disposable.add(disposable) 135 return composite_disposable 136 137 return AnonymousObservable(subscribe) 138 139 140@extensionmethod(Observable) 141def do_on_terminate(self, on_terminate): 142 """Invokes an action on an on_complete() or on_error() event. 143 This can be helpful for debugging, logging, and other side effects when completion or an error terminates an operation. 144 145 146 on_terminate -- Action to invoke when on_complete or on_error is called 147 """ 148 149 def subscribe(observer): 150 151 def on_completed(): 152 try: 153 on_terminate() 154 except Exception as err: 155 observer.on_error(err) 156 else: 157 observer.on_completed() 158 159 def on_error(exception): 160 try: 161 on_terminate() 162 except Exception as err: 163 observer.on_error(err) 164 else: 165 observer.on_error(exception) 166 167 return self.subscribe(observer.on_next, on_error, on_completed) 168 169 return AnonymousObservable(subscribe) 170 171 172@extensionmethod(Observable) 173def do_after_terminate(self, after_terminate): 174 """Invokes an action after an on_complete() or on_error() event. 175 This can be helpful for debugging, logging, and other side effects when completion or an error terminates an operation 176 177 178 on_terminate -- Action to invoke after on_complete or on_error is called 179 """ 180 def subscribe(observer): 181 182 def on_completed(): 183 observer.on_completed() 184 try: 185 after_terminate() 186 except Exception as err: 187 observer.on_error(err) 188 189 def on_error(exception): 190 observer.on_error(exception) 191 try: 192 after_terminate() 193 except Exception as err: 194 observer.on_error(err) 195 196 return self.subscribe(observer.on_next, on_error, on_completed) 197 198 return AnonymousObservable(subscribe) 199 200 201@extensionmethod(Observable) 202def do_finally(self, finally_action): 203 """Invokes an action after an on_complete(), on_error(), or disposal event occurs 204 This can be helpful for debugging, logging, and other side effects when completion, an error, or disposal terminates an operation. 205 Note this operator will strive to execute the finally_action once, and prevent any redudant calls 206 207 finally_action -- Action to invoke after on_complete, on_error, or disposal is called 208 """ 209 210 class OnDispose(Disposable): 211 def __init__(self, was_invoked): 212 self.was_invoked = was_invoked 213 214 def dispose(self): 215 if not self.was_invoked[0]: 216 finally_action() 217 self.was_invoked[0] = True 218 219 def subscribe(observer): 220 221 was_invoked = [False] 222 223 def on_completed(): 224 observer.on_completed() 225 try: 226 if not was_invoked[0]: 227 finally_action() 228 was_invoked[0] = True 229 except Exception as err: 230 observer.on_error(err) 231 232 def on_error(exception): 233 observer.on_error(exception) 234 try: 235 if not was_invoked[0]: 236 finally_action() 237 was_invoked[0] = True 238 except Exception as err: 239 observer.on_error(err) 240 241 composite_disposable = CompositeDisposable() 242 composite_disposable.add(OnDispose(was_invoked)) 243 disposable = self.subscribe(observer.on_next, on_error, on_completed) 244 composite_disposable.add(disposable) 245 246 return composite_disposable 247 248 return AnonymousObservable(subscribe) 249 250