1 /*
2  * Copyright (C) 2021 Finn Herzfeld
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
16  */
17 
18 package io.finn.signald;
19 
20 import io.reactivex.rxjava3.schedulers.Schedulers;
21 import java.util.Arrays;
22 import java.util.concurrent.TimeUnit;
23 import org.apache.logging.log4j.LogManager;
24 import org.apache.logging.log4j.Logger;
25 import org.whispersystems.libsignal.util.guava.Preconditions;
26 import org.whispersystems.signalservice.api.SignalWebSocket;
27 import org.whispersystems.signalservice.api.util.SleepTimer;
28 import org.whispersystems.signalservice.api.websocket.HealthMonitor;
29 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
30 import org.whispersystems.signalservice.internal.websocket.WebSocketConnection;
31 
32 public final class SignalWebSocketHealthMonitor implements HealthMonitor {
33 
34   private final static Logger logger = LogManager.getLogger();
35 
36   private static final long KEEP_ALIVE_SEND_CADENCE = TimeUnit.SECONDS.toMillis(WebSocketConnection.KEEPALIVE_TIMEOUT_SECONDS);
37   private static final long MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE = KEEP_ALIVE_SEND_CADENCE * 3;
38 
39   private SignalWebSocket signalWebSocket;
40   private final SleepTimer sleepTimer;
41 
42   private volatile KeepAliveSender keepAliveSender;
43 
44   private final HealthState identified = new HealthState();
45   private final HealthState unidentified = new HealthState();
46 
SignalWebSocketHealthMonitor(SleepTimer sleepTimer)47   public SignalWebSocketHealthMonitor(SleepTimer sleepTimer) { this.sleepTimer = sleepTimer; }
48 
monitor(SignalWebSocket signalWebSocket)49   public void monitor(SignalWebSocket signalWebSocket) {
50     Preconditions.checkNotNull(signalWebSocket);
51     Preconditions.checkArgument(this.signalWebSocket == null, "monitor can only be called once");
52 
53     this.signalWebSocket = signalWebSocket;
54 
55     // noinspection ResultOfMethodCallIgnored
56     signalWebSocket.getWebSocketState()
57         .subscribeOn(Schedulers.computation())
58         .observeOn(Schedulers.computation())
59         .distinctUntilChanged()
60         .subscribe(s -> onStateChange(s, identified));
61 
62     // noinspection ResultOfMethodCallIgnored
63     signalWebSocket.getUnidentifiedWebSocketState()
64         .subscribeOn(Schedulers.computation())
65         .observeOn(Schedulers.computation())
66         .distinctUntilChanged()
67         .subscribe(s -> onStateChange(s, unidentified));
68   }
69 
onStateChange(WebSocketConnectionState connectionState, HealthState healthState)70   private synchronized void onStateChange(WebSocketConnectionState connectionState, HealthState healthState) {
71     switch (connectionState) {
72     case CONNECTED:
73       logger.debug("WebSocket is now connected");
74       break;
75     case AUTHENTICATION_FAILED:
76       logger.debug("WebSocket authentication failed");
77       break;
78     case FAILED:
79       logger.debug("WebSocket connection failed");
80       break;
81     }
82 
83     healthState.needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED;
84 
85     if (keepAliveSender == null && isKeepAliveNecessary()) {
86       keepAliveSender = new KeepAliveSender();
87       keepAliveSender.start();
88     } else if (keepAliveSender != null && !isKeepAliveNecessary()) {
89       keepAliveSender.shutdown();
90       keepAliveSender = null;
91     }
92   }
93 
94   @Override
onKeepAliveResponse(long sentTimestamp, boolean isIdentifiedWebSocket)95   public void onKeepAliveResponse(long sentTimestamp, boolean isIdentifiedWebSocket) {
96     if (isIdentifiedWebSocket) {
97       identified.lastKeepAliveReceived = System.currentTimeMillis();
98     } else {
99       unidentified.lastKeepAliveReceived = System.currentTimeMillis();
100     }
101   }
102 
103   @Override
onMessageError(int status, boolean isIdentifiedWebSocket)104   public void onMessageError(int status, boolean isIdentifiedWebSocket) {
105     if (status == 409) {
106       HealthState healthState = (isIdentifiedWebSocket ? identified : unidentified);
107       if (healthState.mismatchErrorTracker.addSample(System.currentTimeMillis())) {
108         logger.warn("Received too many mismatch device errors, forcing new websockets.");
109         signalWebSocket.forceNewWebSockets();
110       }
111     }
112   }
113 
isKeepAliveNecessary()114   private boolean isKeepAliveNecessary() { return identified.needsKeepAlive || unidentified.needsKeepAlive; }
115 
116   private static class HealthState {
117 
118     private final HttpErrorTracker mismatchErrorTracker = new HttpErrorTracker(5, TimeUnit.MINUTES.toMillis(1));
119 
120     private volatile boolean needsKeepAlive;
121     private volatile long lastKeepAliveReceived;
122   }
123 
124   /**
125    * Sends periodic heartbeats/keep-alives over both WebSockets to prevent connection timeouts. If
126    * either WebSocket fails 3 times to get a return heartbeat both are forced to be recreated.
127    */
128   private class KeepAliveSender extends Thread {
129 
130     private volatile boolean shouldKeepRunning = true;
131 
run()132     public void run() {
133       identified.lastKeepAliveReceived = System.currentTimeMillis();
134       unidentified.lastKeepAliveReceived = System.currentTimeMillis();
135 
136       while (shouldKeepRunning && isKeepAliveNecessary()) {
137         try {
138           sleepTimer.sleep(KEEP_ALIVE_SEND_CADENCE);
139 
140           if (shouldKeepRunning && isKeepAliveNecessary()) {
141             long keepAliveRequiredSinceTime = System.currentTimeMillis() - MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE;
142 
143             if (identified.lastKeepAliveReceived < keepAliveRequiredSinceTime || unidentified.lastKeepAliveReceived < keepAliveRequiredSinceTime) {
144               logger.warn("Missed keep alives, identified last: " + identified.lastKeepAliveReceived + " unidentified last: " + unidentified.lastKeepAliveReceived +
145                           " needed by: " + keepAliveRequiredSinceTime);
146               signalWebSocket.forceNewWebSockets();
147             } else {
148               signalWebSocket.sendKeepAlive();
149             }
150           }
151         } catch (Throwable e) {
152           logger.warn("Error occured in KeepAliveSender, ignoring ...", e);
153         }
154       }
155     }
156 
shutdown()157     public void shutdown() { shouldKeepRunning = false; }
158   }
159 
160   private final static class HttpErrorTracker {
161 
162     private final long[] timestamps;
163     private final long errorTimeRange;
164 
HttpErrorTracker(int samples, long errorTimeRange)165     public HttpErrorTracker(int samples, long errorTimeRange) {
166       this.timestamps = new long[samples];
167       this.errorTimeRange = errorTimeRange;
168     }
169 
addSample(long now)170     public synchronized boolean addSample(long now) {
171       long errorsMustBeAfter = now - errorTimeRange;
172       int count = 1;
173       int minIndex = 0;
174 
175       for (int i = 0; i < timestamps.length; i++) {
176         if (timestamps[i] < errorsMustBeAfter) {
177           timestamps[i] = 0;
178         } else if (timestamps[i] != 0) {
179           count++;
180         }
181 
182         if (timestamps[i] < timestamps[minIndex]) {
183           minIndex = i;
184         }
185       }
186 
187       timestamps[minIndex] = now;
188 
189       if (count >= timestamps.length) {
190         Arrays.fill(timestamps, 0);
191         return true;
192       }
193       return false;
194     }
195   }
196 }