1 /*
2  * RemoteServerEventListener.java
3  *
4  * Copyright (C) 2021 by RStudio, PBC
5  *
6  * Unless you have received this program directly from RStudio pursuant
7  * to the terms of a commercial license agreement with RStudio, then
8  * this program is licensed to you under the terms of version 3 of the
9  * GNU Affero General Public License. This program is distributed WITHOUT
10  * ANY EXPRESS OR IMPLIED WARRANTY, INCLUDING THOSE OF NON-INFRINGEMENT,
11  * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Please refer to the
12  * AGPL (http://www.gnu.org/licenses/agpl-3.0.txt) for more details.
13  *
14  */
15 package org.rstudio.studio.client.server.remote;
16 
17 import com.google.gwt.core.client.GWT;
18 import com.google.gwt.core.client.JsArray;
19 import com.google.gwt.user.client.Timer;
20 import com.google.gwt.user.client.Window;
21 import com.google.gwt.user.client.Window.ClosingEvent;
22 import com.google.gwt.user.client.Window.ClosingHandler;
23 
24 import org.rstudio.core.client.jsonrpc.RpcError;
25 import org.rstudio.core.client.jsonrpc.RpcRequest;
26 import org.rstudio.core.client.jsonrpc.RpcRequestCallback;
27 import org.rstudio.core.client.jsonrpc.RpcResponse;
28 import org.rstudio.studio.client.application.events.*;
29 import org.rstudio.studio.client.server.ServerError;
30 import org.rstudio.studio.client.server.ServerRequestCallback;
31 
32 import java.util.HashMap;
33 
34 
35 class RemoteServerEventListener
36 {
37    /**
38     * Stores the context needed to complete an async request.
39     */
40    static class AsyncRequestInfo
41    {
AsyncRequestInfo(RpcRequest request, RpcRequestCallback callback)42       AsyncRequestInfo(RpcRequest request, RpcRequestCallback callback)
43       {
44          this.request = request;
45          this.callback = callback;
46       }
47 
48       public final RpcRequest request;
49       public final RpcRequestCallback callback;
50    }
51 
RemoteServerEventListener(RemoteServer server, ClientEventHandler externalEventHandler)52    public RemoteServerEventListener(RemoteServer server,
53                                     ClientEventHandler externalEventHandler)
54    {
55       server_ = server;
56       externalEventHandler_ = externalEventHandler;
57       eventDispatcher_ = new ClientEventDispatcher(server_.getEventBus());
58       lastEventId_ = -1;
59       listenCount_ = 0;
60       listenErrorCount_ = 0;
61       isListening_ = false;
62       sessionWasQuit_ = false;
63 
64       listenTimer_ = new Timer() {
65          @Override
66          public void run()
67          {
68             doListen();
69          }
70       };
71 
72       // we take the liberty of stopping ourselves if the window is on
73       // the verge of being closed. this allows us to prevent the scenario:
74       //
75       //  - window closes and the browser terminates the listener connection
76       //  - onError is called when the connection is terminated -- this results
77       //    in another call to listen() which starts a new connection
78       //  - now we have a "leftover" connection still active with the server
79       //    even after the user has left the page
80       //
81       // we can't use Window CloseEvent because this occurs *after* the
82       // connection is terminated and restarted in onError. we currently
83       // don't handle the ClosingEvent elsewhere in the application so calling
84       // stop() here is as good as calling it in CloseEvent. however, even
85       // if we did handle ClosingEvent and show a prompt which resulted in
86       // the window NOT closing this would still be OK as the event listener
87       // would still be restarted as necessary by the call to ensureEvents
88       //
89       // note that in the future if we need to make sure event listening
90       // is preserved even in the close cancelled case described above
91       // (e.g. for multi-user cases) then we would need to make sure there
92       // is another way to restart the listener (perhaps a global timer
93       // that checks for isListening every few seconds, or perhaps some
94       // abstraction over addWindowClosingHandler that allows "undo" of
95       // things which were closed or shutdown during closing
96       Window.addWindowClosingHandler(new ClosingHandler() {
97          public void onWindowClosing(ClosingEvent event)
98          {
99             stop();
100          }
101       });
102    }
103 
start()104    public void start()
105    {
106       // start should never be called on a running event listener!
107       // (need to protect against extra requests going to the server
108       // and starving the browser of its 2 connections)
109       if (isListening_)
110          stop();
111 
112       // maintain flag indicating that we *should* be listening (allows us to
113       // know when to restart in the case that we are unexpectedly cutoff)
114       isListening_ = true;
115 
116       // reset listen count. this will allow us to delay listening on the
117       // second listen (to prevent the "perpetual loading" problem)
118       listenCount_ = 0;
119 
120       // reset our lastEventId to make sure we get all events which are
121       // currently pending on the server. note in the case of "restarting"
122       // the event listener setting this to -1 could in theory cause us to
123       // receive an event twice (because the reset to -1 causes us to never
124       // confirm receipt of the event with the server). in practice this
125       // would a) be very unlikely; b) not be that big of a deal; and c) is
126       // judged preferable than doing something more complex in this code
127       // which might avoid dupes but cause other bugs (such as missing events
128       // from the server). note also that when we go multi-user we'll be
129       // revisiting this mechanism again so there will be an opportunity to
130       // eliminate this scenario then
131       lastEventId_ = -1;
132 
133       // start listening
134       listen();
135    }
136 
stop()137    public void stop()
138    {
139       listenTimer_.cancel();
140       isListening_ = false;
141       listenCount_ = 0;
142       if (activeRequestCallback_ != null)
143       {
144          activeRequestCallback_.cancel();
145          activeRequestCallback_ = null;
146       }
147       if (activeRequest_ != null)
148       {
149          activeRequest_.cancel();
150          activeRequest_ = null;
151       }
152    }
153 
154    // ensure that we are actively listening for events (used to make
155    // sure that we restart listening when the session is about to resume
156    // after a suspension)
ensureListening(final int attempts)157    public void ensureListening(final int attempts)
158    {
159       // exit if we are now listening
160       if (isListening_)
161          return;
162 
163       // exit if we have already quit or been disconnected
164       if (sessionWasQuit_ || server_.isDisconnected())
165          return;
166 
167       // attempt to start the service
168       start();
169 
170       // if appropriate, schedule another attempt in 250ms
171       final int attemptsRemaining = attempts - 1;
172       if (attemptsRemaining > 0)
173       {
174          new Timer() {
175             public void run()
176             {
177                ensureListening(attemptsRemaining);
178             }
179          }.schedule(250);
180       }
181    }
182 
183    // ensure that events are received during the next short time interval.
184    // this not only starts listening if we aren't currently listening but
185    // also ensures (via a Watchdog) that events are received (and if they
186    // are not received restarts the event listener)
ensureEvents()187    public void ensureEvents()
188    {
189      // if we aren't listening then start us up
190      if (!isListening_)
191      {
192          start();
193      }
194 
195      // if we are listening then use the Watchdog to still make sure we
196      // receive the events even if it requires restarting
197      else
198      {
199         // NOTE: Watchdog is required to work around pathological cases
200         // where the browser has terminated our request for events but
201         // we have not been notified nor can we programmatically detect it.
202         // we need a way to recover and this is it. we have observed this
203         // behavior in webkit if:
204         //
205         //   1) we do not use DeferredCommand/doListen (see below); and
206         //
207         //   2) the user navigates Back within a Frame
208         //
209         // can only imagine that it could happen in other scenarios!
210         if (!watchdog_.isRunning())
211            watchdog_.schedule(kWatchdogIntervalMs);
212      }
213    }
214 
restart()215    private void restart()
216    {
217       stop();
218       start();
219    }
220 
listen()221    private void listen()
222    {
223       // bounce listen to ensure it is never added to the browser's internal
224       // list of requests bound to the current page load. being on this list
225       // (at least in webkit, perhaps in others) results in at least 2 and
226       // perhaps other problems:
227       //
228       //  1) perpetual "Loading..." indicator displayed to user (user can
229       //     also then "cancel" the event request!); and
230       //
231       //  2) termination of the request without warning by the browser when
232       //     the user hits the Back button within a frame hosted on the page
233       //     (note in this case we get no error so think the request is still
234       //     running -- see Watchdog for workaround to this general class of
235       //     issues)
236 
237       // determine bounce ms (do a bigger bounce for the second listen
238       // request as this is the one which gets us stuck in "perpetual loading")
239       int bounceMs = 1;
240       if (++listenCount_ == 2)
241          bounceMs = kSecondListenBounceMs;
242 
243       listenTimer_.schedule(bounceMs);
244    }
245 
doListen()246    private void doListen()
247    {
248       // abort if we are no longer running
249       if (!isListening_)
250          return;
251 
252       // setup request callback (save reference for cancellation)
253       activeRequestCallback_ = new ServerRequestCallback<JsArray<ClientEvent>>()
254       {
255          @Override
256          public void onResponseReceived(JsArray<ClientEvent> events)
257          {
258             // keep watchdog appraised of successful receipt of events
259             watchdog_.cancel();
260 
261             // if we were cancelled (such as if we called stop), do not attempt to process the events
262             // and do not attempt to start listening again (until an explicit call to start is made)
263             if (cancelled())
264                return;
265 
266             try
267             {
268                // only process events if we are still listening
269                if (isListening_ && (events != null))
270                {
271                   for (int i=0; i<events.length(); i++)
272                   {
273                      // we can stop listening in the middle of dispatching
274                      // events (e.g. if we dispatch a Suicide event) so we
275                      // need to check the listening_ flag before each event
276                      // is dispatched
277                      if (!isListening_)
278                         return;
279 
280                      // dispatch event
281                      ClientEvent event = events.get(i);
282                      dispatchEvent(event);
283                      lastEventId_ = event.getId();
284                   }
285                }
286             }
287             // catch all here to make sure that in all cases we call
288             // listen() again after processing
289             catch(Throwable e)
290             {
291                GWT.log("ERROR: Processing client events", e);
292             }
293 
294             // listen for more events
295             listen();
296          }
297 
298          @Override
299          public void onError(ServerError error)
300          {
301             // stop listening for events
302             stop();
303 
304             // if this was server unavailable then signal event and return
305             if (error.getCode() == ServerError.UNAVAILABLE)
306             {
307                ServerUnavailableEvent event = new ServerUnavailableEvent();
308                server_.getEventBus().fireEvent(event);
309                return;
310             }
311 
312             // attempt to restart listening, but throttle restart attempts
313             // in both timing (500ms delay) and quantity (no more than 5
314             // attempts). We do this because unthrottled restart attempts could
315             // result in our server getting hammered with requests)
316             if (listenErrorCount_++ <= 5)
317             {
318                Timer startTimer = new Timer() {
319                   @Override
320                   public void run()
321                   {
322                      // only start again if we haven't been started
323                      // by some other means (e.g. ensureListening, etc)
324                      if (!isListening_)
325                         start();
326                   }
327                };
328                startTimer.schedule(500);
329             }
330             // otherwise reset the listen error count and remain stopped
331             else
332             {
333                listenErrorCount_ = 0;
334             }
335          }
336       };
337 
338       // retry handler (restart listener)
339       RetryHandler retryHandler = new RetryHandler() {
340 
341          public void onRetry()
342          {
343             // need to do a full restart to ensure that the existing
344             // activeRequest_ and activeRequestCallback_ are cleaned up
345             // and all state is reset correctly
346             restart();
347          }
348 
349          public void onModifiedRetry(RpcRequest modifiedRequest)
350          {
351             restart();
352          }
353 
354          public void onError(RpcError error)
355          {
356             // error while attempting to recover, to be on the safe side
357             // we simply stop listening for events. if rather than stopping
358             // we restarted we would open ourselves up to a situation
359             // where we keep hitting the same error over and over again.
360             stop();
361          }
362       };
363 
364       // bump the watchdog timer if it's running
365       if (watchdog_.isRunning())
366          watchdog_.schedule(kWatchdogIntervalMs);
367 
368       // send request
369       activeRequest_ = server_.getEvents(lastEventId_,
370                                          activeRequestCallback_,
371                                          retryHandler);
372    }
373 
374 
dispatchEvent(ClientEvent event)375    private void dispatchEvent(ClientEvent event)
376    {
377       // do some special handling before calling the standard dispatcher
378       String type = event.getType();
379 
380       // we handle async completions directly
381       if (type == ClientEvent.AsyncCompletion)
382       {
383          AsyncCompletion completion = event.getData();
384          String handle = completion.getHandle();
385          AsyncRequestInfo req = asyncRequests_.remove(handle);
386          if (req != null)
387          {
388             req.callback.onResponseReceived(req.request,
389                                             completion.getResponse());
390          }
391          else
392          {
393             // We haven't seen this request yet. Store it for later,
394             // maybe it's just taking a long time for the request
395             // to complete.
396             asyncResponses_.put(handle, completion.getResponse());
397          }
398       }
399       else
400       {
401          // if there is a quit event then we set an internal flag to avoid
402          // ensureListening/ensureEvents calls trying to spark the event
403          // stream back up after the user has quit
404          if (type == ClientEvent.Quit)
405             sessionWasQuit_ = true;
406 
407          // perform standard handling
408          eventDispatcher_.enqueEvent(event);
409 
410          // allow any external handler registered to see the event
411          if (externalEventHandler_ != null)
412             externalEventHandler_.onClientEvent(event);
413       }
414 
415    }
416 
417    // NOTE: the design of the Watchdog likely results in more restarts of
418    // the event service than is optimal. when an rpc call reports that
419    // events are pending and the Watchdog is invoked it is very likely
420    // that the events have already been delivered in response to the
421    // previous poll. In this case the Watchdog "misses" those events which
422    // were already delivered and subsequently assumes that the service
423    // needs to be restarted
424 
425    private class Watchdog extends Timer
426    {
427       @Override
run()428       public void run()
429       {
430          try
431          {
432             // ensure that the workbench wasn't closed while we
433             // were waiting for the timer to run
434             if (!sessionWasQuit_)
435                restart();
436          }
437          catch(Throwable e)
438          {
439             GWT.log("Error restarting event source", e);
440          }
441       }
442    }
443 
registerAsyncHandle(String asyncHandle, RpcRequest request, RpcRequestCallback callback)444    public void registerAsyncHandle(String asyncHandle,
445                                    RpcRequest request,
446                                    RpcRequestCallback callback)
447    {
448       RpcResponse response = asyncResponses_.remove(asyncHandle);
449       if (response == null)
450       {
451          // We don't have the response for this request--this is
452          // the normal case.
453          asyncRequests_.put(asyncHandle,
454                             new AsyncRequestInfo(request, callback));
455       }
456       else
457       {
458          // We already have the response--the request must've taken
459          // a long time to return.
460          callback.onResponseReceived(request, response);
461       }
462    }
463 
464    private final RemoteServer server_;
465 
466    // note: kSecondListenDelayMs must be less than kWatchdogIntervalMs
467    // (by a reasonable margin) to void the watchdog getting involved
468    // unnecessarily during a listen delay
469    private final int kWatchdogIntervalMs = 1000;
470    private final int kSecondListenBounceMs = 250;
471    private Timer listenTimer_;
472 
473    private boolean isListening_;
474    private int lastEventId_;
475    private int listenCount_;
476    private int listenErrorCount_;
477    private boolean sessionWasQuit_;
478 
479    private RpcRequest activeRequest_;
480    private ServerRequestCallback<JsArray<ClientEvent>> activeRequestCallback_;
481 
482    private final ClientEventDispatcher eventDispatcher_;
483 
484    private final ClientEventHandler externalEventHandler_;
485 
486    private Watchdog watchdog_ = new Watchdog();
487 
488    // Stores async requests that expect to be completed later.
489    private final HashMap<String, AsyncRequestInfo> asyncRequests_ = new HashMap<>();
490 
491    // Stores any async responses that didn't have matching requests at the
492    // time they were received. This is to deal with any race conditions where
493    // the completion occurs before we even finished making the request.
494    private final HashMap<String, RpcResponse> asyncResponses_ = new HashMap<>();
495 }
496