1 /* 2 * Copyright (C) 2021 Finn Herzfeld 3 * 4 * This program is free software: you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation, either version 3 of the License, or 7 * (at your option) any later version. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * GNU General Public License for more details. 13 * 14 * You should have received a copy of the GNU General Public License 15 * along with this program. If not, see <http://www.gnu.org/licenses/>. 16 */ 17 18 package io.finn.signald; 19 20 import io.reactivex.rxjava3.schedulers.Schedulers; 21 import java.util.Arrays; 22 import java.util.concurrent.TimeUnit; 23 import org.apache.logging.log4j.LogManager; 24 import org.apache.logging.log4j.Logger; 25 import org.whispersystems.libsignal.util.guava.Preconditions; 26 import org.whispersystems.signalservice.api.SignalWebSocket; 27 import org.whispersystems.signalservice.api.util.SleepTimer; 28 import org.whispersystems.signalservice.api.websocket.HealthMonitor; 29 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; 30 import org.whispersystems.signalservice.internal.websocket.WebSocketConnection; 31 32 public final class SignalWebSocketHealthMonitor implements HealthMonitor { 33 34 private final static Logger logger = LogManager.getLogger(); 35 36 private static final long KEEP_ALIVE_SEND_CADENCE = TimeUnit.SECONDS.toMillis(WebSocketConnection.KEEPALIVE_TIMEOUT_SECONDS); 37 private static final long MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE = KEEP_ALIVE_SEND_CADENCE * 3; 38 39 private SignalWebSocket signalWebSocket; 40 private final SleepTimer sleepTimer; 41 42 private volatile KeepAliveSender keepAliveSender; 43 44 private final HealthState identified = new HealthState(); 45 private final HealthState unidentified = new HealthState(); 46 SignalWebSocketHealthMonitor(SleepTimer sleepTimer)47 public SignalWebSocketHealthMonitor(SleepTimer sleepTimer) { this.sleepTimer = sleepTimer; } 48 monitor(SignalWebSocket signalWebSocket)49 public void monitor(SignalWebSocket signalWebSocket) { 50 Preconditions.checkNotNull(signalWebSocket); 51 Preconditions.checkArgument(this.signalWebSocket == null, "monitor can only be called once"); 52 53 this.signalWebSocket = signalWebSocket; 54 55 // noinspection ResultOfMethodCallIgnored 56 signalWebSocket.getWebSocketState() 57 .subscribeOn(Schedulers.computation()) 58 .observeOn(Schedulers.computation()) 59 .distinctUntilChanged() 60 .subscribe(s -> onStateChange(s, identified)); 61 62 // noinspection ResultOfMethodCallIgnored 63 signalWebSocket.getUnidentifiedWebSocketState() 64 .subscribeOn(Schedulers.computation()) 65 .observeOn(Schedulers.computation()) 66 .distinctUntilChanged() 67 .subscribe(s -> onStateChange(s, unidentified)); 68 } 69 onStateChange(WebSocketConnectionState connectionState, HealthState healthState)70 private synchronized void onStateChange(WebSocketConnectionState connectionState, HealthState healthState) { 71 switch (connectionState) { 72 case CONNECTED: 73 logger.debug("WebSocket is now connected"); 74 break; 75 case AUTHENTICATION_FAILED: 76 logger.debug("WebSocket authentication failed"); 77 break; 78 case FAILED: 79 logger.debug("WebSocket connection failed"); 80 break; 81 } 82 83 healthState.needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED; 84 85 if (keepAliveSender == null && isKeepAliveNecessary()) { 86 keepAliveSender = new KeepAliveSender(); 87 keepAliveSender.start(); 88 } else if (keepAliveSender != null && !isKeepAliveNecessary()) { 89 keepAliveSender.shutdown(); 90 keepAliveSender = null; 91 } 92 } 93 94 @Override onKeepAliveResponse(long sentTimestamp, boolean isIdentifiedWebSocket)95 public void onKeepAliveResponse(long sentTimestamp, boolean isIdentifiedWebSocket) { 96 if (isIdentifiedWebSocket) { 97 identified.lastKeepAliveReceived = System.currentTimeMillis(); 98 } else { 99 unidentified.lastKeepAliveReceived = System.currentTimeMillis(); 100 } 101 } 102 103 @Override onMessageError(int status, boolean isIdentifiedWebSocket)104 public void onMessageError(int status, boolean isIdentifiedWebSocket) { 105 if (status == 409) { 106 HealthState healthState = (isIdentifiedWebSocket ? identified : unidentified); 107 if (healthState.mismatchErrorTracker.addSample(System.currentTimeMillis())) { 108 logger.warn("Received too many mismatch device errors, forcing new websockets."); 109 signalWebSocket.forceNewWebSockets(); 110 } 111 } 112 } 113 isKeepAliveNecessary()114 private boolean isKeepAliveNecessary() { return identified.needsKeepAlive || unidentified.needsKeepAlive; } 115 116 private static class HealthState { 117 118 private final HttpErrorTracker mismatchErrorTracker = new HttpErrorTracker(5, TimeUnit.MINUTES.toMillis(1)); 119 120 private volatile boolean needsKeepAlive; 121 private volatile long lastKeepAliveReceived; 122 } 123 124 /** 125 * Sends periodic heartbeats/keep-alives over both WebSockets to prevent connection timeouts. If 126 * either WebSocket fails 3 times to get a return heartbeat both are forced to be recreated. 127 */ 128 private class KeepAliveSender extends Thread { 129 130 private volatile boolean shouldKeepRunning = true; 131 run()132 public void run() { 133 identified.lastKeepAliveReceived = System.currentTimeMillis(); 134 unidentified.lastKeepAliveReceived = System.currentTimeMillis(); 135 136 while (shouldKeepRunning && isKeepAliveNecessary()) { 137 try { 138 sleepTimer.sleep(KEEP_ALIVE_SEND_CADENCE); 139 140 if (shouldKeepRunning && isKeepAliveNecessary()) { 141 long keepAliveRequiredSinceTime = System.currentTimeMillis() - MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE; 142 143 if (identified.lastKeepAliveReceived < keepAliveRequiredSinceTime || unidentified.lastKeepAliveReceived < keepAliveRequiredSinceTime) { 144 logger.warn("Missed keep alives, identified last: " + identified.lastKeepAliveReceived + " unidentified last: " + unidentified.lastKeepAliveReceived + 145 " needed by: " + keepAliveRequiredSinceTime); 146 signalWebSocket.forceNewWebSockets(); 147 } else { 148 signalWebSocket.sendKeepAlive(); 149 } 150 } 151 } catch (Throwable e) { 152 logger.warn("Error occured in KeepAliveSender, ignoring ...", e); 153 } 154 } 155 } 156 shutdown()157 public void shutdown() { shouldKeepRunning = false; } 158 } 159 160 private final static class HttpErrorTracker { 161 162 private final long[] timestamps; 163 private final long errorTimeRange; 164 HttpErrorTracker(int samples, long errorTimeRange)165 public HttpErrorTracker(int samples, long errorTimeRange) { 166 this.timestamps = new long[samples]; 167 this.errorTimeRange = errorTimeRange; 168 } 169 addSample(long now)170 public synchronized boolean addSample(long now) { 171 long errorsMustBeAfter = now - errorTimeRange; 172 int count = 1; 173 int minIndex = 0; 174 175 for (int i = 0; i < timestamps.length; i++) { 176 if (timestamps[i] < errorsMustBeAfter) { 177 timestamps[i] = 0; 178 } else if (timestamps[i] != 0) { 179 count++; 180 } 181 182 if (timestamps[i] < timestamps[minIndex]) { 183 minIndex = i; 184 } 185 } 186 187 timestamps[minIndex] = now; 188 189 if (count >= timestamps.length) { 190 Arrays.fill(timestamps, 0); 191 return true; 192 } 193 return false; 194 } 195 } 196 }