1 /* 2 * RemoteServerEventListener.java 3 * 4 * Copyright (C) 2021 by RStudio, PBC 5 * 6 * Unless you have received this program directly from RStudio pursuant 7 * to the terms of a commercial license agreement with RStudio, then 8 * this program is licensed to you under the terms of version 3 of the 9 * GNU Affero General Public License. This program is distributed WITHOUT 10 * ANY EXPRESS OR IMPLIED WARRANTY, INCLUDING THOSE OF NON-INFRINGEMENT, 11 * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Please refer to the 12 * AGPL (http://www.gnu.org/licenses/agpl-3.0.txt) for more details. 13 * 14 */ 15 package org.rstudio.studio.client.server.remote; 16 17 import com.google.gwt.core.client.GWT; 18 import com.google.gwt.core.client.JsArray; 19 import com.google.gwt.user.client.Timer; 20 import com.google.gwt.user.client.Window; 21 import com.google.gwt.user.client.Window.ClosingEvent; 22 import com.google.gwt.user.client.Window.ClosingHandler; 23 24 import org.rstudio.core.client.jsonrpc.RpcError; 25 import org.rstudio.core.client.jsonrpc.RpcRequest; 26 import org.rstudio.core.client.jsonrpc.RpcRequestCallback; 27 import org.rstudio.core.client.jsonrpc.RpcResponse; 28 import org.rstudio.studio.client.application.events.*; 29 import org.rstudio.studio.client.server.ServerError; 30 import org.rstudio.studio.client.server.ServerRequestCallback; 31 32 import java.util.HashMap; 33 34 35 class RemoteServerEventListener 36 { 37 /** 38 * Stores the context needed to complete an async request. 39 */ 40 static class AsyncRequestInfo 41 { AsyncRequestInfo(RpcRequest request, RpcRequestCallback callback)42 AsyncRequestInfo(RpcRequest request, RpcRequestCallback callback) 43 { 44 this.request = request; 45 this.callback = callback; 46 } 47 48 public final RpcRequest request; 49 public final RpcRequestCallback callback; 50 } 51 RemoteServerEventListener(RemoteServer server, ClientEventHandler externalEventHandler)52 public RemoteServerEventListener(RemoteServer server, 53 ClientEventHandler externalEventHandler) 54 { 55 server_ = server; 56 externalEventHandler_ = externalEventHandler; 57 eventDispatcher_ = new ClientEventDispatcher(server_.getEventBus()); 58 lastEventId_ = -1; 59 listenCount_ = 0; 60 listenErrorCount_ = 0; 61 isListening_ = false; 62 sessionWasQuit_ = false; 63 64 listenTimer_ = new Timer() { 65 @Override 66 public void run() 67 { 68 doListen(); 69 } 70 }; 71 72 // we take the liberty of stopping ourselves if the window is on 73 // the verge of being closed. this allows us to prevent the scenario: 74 // 75 // - window closes and the browser terminates the listener connection 76 // - onError is called when the connection is terminated -- this results 77 // in another call to listen() which starts a new connection 78 // - now we have a "leftover" connection still active with the server 79 // even after the user has left the page 80 // 81 // we can't use Window CloseEvent because this occurs *after* the 82 // connection is terminated and restarted in onError. we currently 83 // don't handle the ClosingEvent elsewhere in the application so calling 84 // stop() here is as good as calling it in CloseEvent. however, even 85 // if we did handle ClosingEvent and show a prompt which resulted in 86 // the window NOT closing this would still be OK as the event listener 87 // would still be restarted as necessary by the call to ensureEvents 88 // 89 // note that in the future if we need to make sure event listening 90 // is preserved even in the close cancelled case described above 91 // (e.g. for multi-user cases) then we would need to make sure there 92 // is another way to restart the listener (perhaps a global timer 93 // that checks for isListening every few seconds, or perhaps some 94 // abstraction over addWindowClosingHandler that allows "undo" of 95 // things which were closed or shutdown during closing 96 Window.addWindowClosingHandler(new ClosingHandler() { 97 public void onWindowClosing(ClosingEvent event) 98 { 99 stop(); 100 } 101 }); 102 } 103 start()104 public void start() 105 { 106 // start should never be called on a running event listener! 107 // (need to protect against extra requests going to the server 108 // and starving the browser of its 2 connections) 109 if (isListening_) 110 stop(); 111 112 // maintain flag indicating that we *should* be listening (allows us to 113 // know when to restart in the case that we are unexpectedly cutoff) 114 isListening_ = true; 115 116 // reset listen count. this will allow us to delay listening on the 117 // second listen (to prevent the "perpetual loading" problem) 118 listenCount_ = 0; 119 120 // reset our lastEventId to make sure we get all events which are 121 // currently pending on the server. note in the case of "restarting" 122 // the event listener setting this to -1 could in theory cause us to 123 // receive an event twice (because the reset to -1 causes us to never 124 // confirm receipt of the event with the server). in practice this 125 // would a) be very unlikely; b) not be that big of a deal; and c) is 126 // judged preferable than doing something more complex in this code 127 // which might avoid dupes but cause other bugs (such as missing events 128 // from the server). note also that when we go multi-user we'll be 129 // revisiting this mechanism again so there will be an opportunity to 130 // eliminate this scenario then 131 lastEventId_ = -1; 132 133 // start listening 134 listen(); 135 } 136 stop()137 public void stop() 138 { 139 listenTimer_.cancel(); 140 isListening_ = false; 141 listenCount_ = 0; 142 if (activeRequestCallback_ != null) 143 { 144 activeRequestCallback_.cancel(); 145 activeRequestCallback_ = null; 146 } 147 if (activeRequest_ != null) 148 { 149 activeRequest_.cancel(); 150 activeRequest_ = null; 151 } 152 } 153 154 // ensure that we are actively listening for events (used to make 155 // sure that we restart listening when the session is about to resume 156 // after a suspension) ensureListening(final int attempts)157 public void ensureListening(final int attempts) 158 { 159 // exit if we are now listening 160 if (isListening_) 161 return; 162 163 // exit if we have already quit or been disconnected 164 if (sessionWasQuit_ || server_.isDisconnected()) 165 return; 166 167 // attempt to start the service 168 start(); 169 170 // if appropriate, schedule another attempt in 250ms 171 final int attemptsRemaining = attempts - 1; 172 if (attemptsRemaining > 0) 173 { 174 new Timer() { 175 public void run() 176 { 177 ensureListening(attemptsRemaining); 178 } 179 }.schedule(250); 180 } 181 } 182 183 // ensure that events are received during the next short time interval. 184 // this not only starts listening if we aren't currently listening but 185 // also ensures (via a Watchdog) that events are received (and if they 186 // are not received restarts the event listener) ensureEvents()187 public void ensureEvents() 188 { 189 // if we aren't listening then start us up 190 if (!isListening_) 191 { 192 start(); 193 } 194 195 // if we are listening then use the Watchdog to still make sure we 196 // receive the events even if it requires restarting 197 else 198 { 199 // NOTE: Watchdog is required to work around pathological cases 200 // where the browser has terminated our request for events but 201 // we have not been notified nor can we programmatically detect it. 202 // we need a way to recover and this is it. we have observed this 203 // behavior in webkit if: 204 // 205 // 1) we do not use DeferredCommand/doListen (see below); and 206 // 207 // 2) the user navigates Back within a Frame 208 // 209 // can only imagine that it could happen in other scenarios! 210 if (!watchdog_.isRunning()) 211 watchdog_.schedule(kWatchdogIntervalMs); 212 } 213 } 214 restart()215 private void restart() 216 { 217 stop(); 218 start(); 219 } 220 listen()221 private void listen() 222 { 223 // bounce listen to ensure it is never added to the browser's internal 224 // list of requests bound to the current page load. being on this list 225 // (at least in webkit, perhaps in others) results in at least 2 and 226 // perhaps other problems: 227 // 228 // 1) perpetual "Loading..." indicator displayed to user (user can 229 // also then "cancel" the event request!); and 230 // 231 // 2) termination of the request without warning by the browser when 232 // the user hits the Back button within a frame hosted on the page 233 // (note in this case we get no error so think the request is still 234 // running -- see Watchdog for workaround to this general class of 235 // issues) 236 237 // determine bounce ms (do a bigger bounce for the second listen 238 // request as this is the one which gets us stuck in "perpetual loading") 239 int bounceMs = 1; 240 if (++listenCount_ == 2) 241 bounceMs = kSecondListenBounceMs; 242 243 listenTimer_.schedule(bounceMs); 244 } 245 doListen()246 private void doListen() 247 { 248 // abort if we are no longer running 249 if (!isListening_) 250 return; 251 252 // setup request callback (save reference for cancellation) 253 activeRequestCallback_ = new ServerRequestCallback<JsArray<ClientEvent>>() 254 { 255 @Override 256 public void onResponseReceived(JsArray<ClientEvent> events) 257 { 258 // keep watchdog appraised of successful receipt of events 259 watchdog_.cancel(); 260 261 // if we were cancelled (such as if we called stop), do not attempt to process the events 262 // and do not attempt to start listening again (until an explicit call to start is made) 263 if (cancelled()) 264 return; 265 266 try 267 { 268 // only process events if we are still listening 269 if (isListening_ && (events != null)) 270 { 271 for (int i=0; i<events.length(); i++) 272 { 273 // we can stop listening in the middle of dispatching 274 // events (e.g. if we dispatch a Suicide event) so we 275 // need to check the listening_ flag before each event 276 // is dispatched 277 if (!isListening_) 278 return; 279 280 // dispatch event 281 ClientEvent event = events.get(i); 282 dispatchEvent(event); 283 lastEventId_ = event.getId(); 284 } 285 } 286 } 287 // catch all here to make sure that in all cases we call 288 // listen() again after processing 289 catch(Throwable e) 290 { 291 GWT.log("ERROR: Processing client events", e); 292 } 293 294 // listen for more events 295 listen(); 296 } 297 298 @Override 299 public void onError(ServerError error) 300 { 301 // stop listening for events 302 stop(); 303 304 // if this was server unavailable then signal event and return 305 if (error.getCode() == ServerError.UNAVAILABLE) 306 { 307 ServerUnavailableEvent event = new ServerUnavailableEvent(); 308 server_.getEventBus().fireEvent(event); 309 return; 310 } 311 312 // attempt to restart listening, but throttle restart attempts 313 // in both timing (500ms delay) and quantity (no more than 5 314 // attempts). We do this because unthrottled restart attempts could 315 // result in our server getting hammered with requests) 316 if (listenErrorCount_++ <= 5) 317 { 318 Timer startTimer = new Timer() { 319 @Override 320 public void run() 321 { 322 // only start again if we haven't been started 323 // by some other means (e.g. ensureListening, etc) 324 if (!isListening_) 325 start(); 326 } 327 }; 328 startTimer.schedule(500); 329 } 330 // otherwise reset the listen error count and remain stopped 331 else 332 { 333 listenErrorCount_ = 0; 334 } 335 } 336 }; 337 338 // retry handler (restart listener) 339 RetryHandler retryHandler = new RetryHandler() { 340 341 public void onRetry() 342 { 343 // need to do a full restart to ensure that the existing 344 // activeRequest_ and activeRequestCallback_ are cleaned up 345 // and all state is reset correctly 346 restart(); 347 } 348 349 public void onModifiedRetry(RpcRequest modifiedRequest) 350 { 351 restart(); 352 } 353 354 public void onError(RpcError error) 355 { 356 // error while attempting to recover, to be on the safe side 357 // we simply stop listening for events. if rather than stopping 358 // we restarted we would open ourselves up to a situation 359 // where we keep hitting the same error over and over again. 360 stop(); 361 } 362 }; 363 364 // bump the watchdog timer if it's running 365 if (watchdog_.isRunning()) 366 watchdog_.schedule(kWatchdogIntervalMs); 367 368 // send request 369 activeRequest_ = server_.getEvents(lastEventId_, 370 activeRequestCallback_, 371 retryHandler); 372 } 373 374 dispatchEvent(ClientEvent event)375 private void dispatchEvent(ClientEvent event) 376 { 377 // do some special handling before calling the standard dispatcher 378 String type = event.getType(); 379 380 // we handle async completions directly 381 if (type == ClientEvent.AsyncCompletion) 382 { 383 AsyncCompletion completion = event.getData(); 384 String handle = completion.getHandle(); 385 AsyncRequestInfo req = asyncRequests_.remove(handle); 386 if (req != null) 387 { 388 req.callback.onResponseReceived(req.request, 389 completion.getResponse()); 390 } 391 else 392 { 393 // We haven't seen this request yet. Store it for later, 394 // maybe it's just taking a long time for the request 395 // to complete. 396 asyncResponses_.put(handle, completion.getResponse()); 397 } 398 } 399 else 400 { 401 // if there is a quit event then we set an internal flag to avoid 402 // ensureListening/ensureEvents calls trying to spark the event 403 // stream back up after the user has quit 404 if (type == ClientEvent.Quit) 405 sessionWasQuit_ = true; 406 407 // perform standard handling 408 eventDispatcher_.enqueEvent(event); 409 410 // allow any external handler registered to see the event 411 if (externalEventHandler_ != null) 412 externalEventHandler_.onClientEvent(event); 413 } 414 415 } 416 417 // NOTE: the design of the Watchdog likely results in more restarts of 418 // the event service than is optimal. when an rpc call reports that 419 // events are pending and the Watchdog is invoked it is very likely 420 // that the events have already been delivered in response to the 421 // previous poll. In this case the Watchdog "misses" those events which 422 // were already delivered and subsequently assumes that the service 423 // needs to be restarted 424 425 private class Watchdog extends Timer 426 { 427 @Override run()428 public void run() 429 { 430 try 431 { 432 // ensure that the workbench wasn't closed while we 433 // were waiting for the timer to run 434 if (!sessionWasQuit_) 435 restart(); 436 } 437 catch(Throwable e) 438 { 439 GWT.log("Error restarting event source", e); 440 } 441 } 442 } 443 registerAsyncHandle(String asyncHandle, RpcRequest request, RpcRequestCallback callback)444 public void registerAsyncHandle(String asyncHandle, 445 RpcRequest request, 446 RpcRequestCallback callback) 447 { 448 RpcResponse response = asyncResponses_.remove(asyncHandle); 449 if (response == null) 450 { 451 // We don't have the response for this request--this is 452 // the normal case. 453 asyncRequests_.put(asyncHandle, 454 new AsyncRequestInfo(request, callback)); 455 } 456 else 457 { 458 // We already have the response--the request must've taken 459 // a long time to return. 460 callback.onResponseReceived(request, response); 461 } 462 } 463 464 private final RemoteServer server_; 465 466 // note: kSecondListenDelayMs must be less than kWatchdogIntervalMs 467 // (by a reasonable margin) to void the watchdog getting involved 468 // unnecessarily during a listen delay 469 private final int kWatchdogIntervalMs = 1000; 470 private final int kSecondListenBounceMs = 250; 471 private Timer listenTimer_; 472 473 private boolean isListening_; 474 private int lastEventId_; 475 private int listenCount_; 476 private int listenErrorCount_; 477 private boolean sessionWasQuit_; 478 479 private RpcRequest activeRequest_; 480 private ServerRequestCallback<JsArray<ClientEvent>> activeRequestCallback_; 481 482 private final ClientEventDispatcher eventDispatcher_; 483 484 private final ClientEventHandler externalEventHandler_; 485 486 private Watchdog watchdog_ = new Watchdog(); 487 488 // Stores async requests that expect to be completed later. 489 private final HashMap<String, AsyncRequestInfo> asyncRequests_ = new HashMap<>(); 490 491 // Stores any async responses that didn't have matching requests at the 492 // time they were received. This is to deal with any race conditions where 493 // the completion occurs before we even finished making the request. 494 private final HashMap<String, RpcResponse> asyncResponses_ = new HashMap<>(); 495 } 496