1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_ 6 #define GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_ 7 8 #include <stdint.h> 9 10 #include <map> 11 #include <memory> 12 #include <string> 13 #include <vector> 14 15 #include "base/containers/circular_deque.h" 16 #include "base/files/file_path.h" 17 #include "base/macros.h" 18 #include "base/memory/weak_ptr.h" 19 #include "google_apis/gcm/base/gcm_export.h" 20 #include "google_apis/gcm/base/mcs_message.h" 21 #include "google_apis/gcm/engine/connection_factory.h" 22 #include "google_apis/gcm/engine/connection_handler.h" 23 #include "google_apis/gcm/engine/gcm_store.h" 24 #include "google_apis/gcm/engine/heartbeat_manager.h" 25 26 namespace base { 27 class Clock; 28 class RetainingOneShotTimer; 29 } // namespace base 30 31 namespace google { 32 namespace protobuf { 33 class MessageLite; 34 } // namespace protobuf 35 } // namespace google 36 37 namespace mcs_proto { 38 class LoginRequest; 39 } 40 41 namespace gcm { 42 43 class CollapseKey; 44 class ConnectionFactory; 45 class GCMStatsRecorder; 46 struct ReliablePacketInfo; 47 48 // An MCS client. This client is in charge of all communications with an 49 // MCS endpoint, and is capable of reliably sending/receiving GCM messages. 50 // NOTE: Not thread safe. This class should live on the same thread as that 51 // network requests are performed on. 52 class GCM_EXPORT MCSClient { 53 public: 54 // Any change made to this enum should have corresponding change in the 55 // GetStateString(...) function. 56 enum State { 57 UNINITIALIZED, // Uninitialized. 58 LOADED, // GCM Load finished, waiting to connect. 59 CONNECTING, // Connection in progress. 60 CONNECTED, // Connected and running. 61 }; 62 63 // Any change made to this enum should have corresponding change in the 64 // GetMessageSendStatusString(...) function in mcs_client.cc. 65 enum MessageSendStatus { 66 // Message was queued successfully. 67 QUEUED, 68 // Message was sent to the server and the ACK was received. 69 SENT, 70 // Message not saved, because total queue size limit reached. 71 QUEUE_SIZE_LIMIT_REACHED, 72 // Message not saved, because app queue size limit reached. 73 APP_QUEUE_SIZE_LIMIT_REACHED, 74 // Message too large to send. 75 MESSAGE_TOO_LARGE, 76 // Message not send becuase of TTL = 0 and no working connection. 77 NO_CONNECTION_ON_ZERO_TTL, 78 // Message exceeded TTL. 79 TTL_EXCEEDED, 80 81 // NOTE: always keep this entry at the end. Add new status types only 82 // immediately above this line. Make sure to update the corresponding 83 // histogram enum accordingly. 84 SEND_STATUS_COUNT 85 }; 86 87 // Callback for MCSClient's error conditions. A repeating callback is used 88 // because occasionally multiple errors are reported, see crbug.com/1039598 89 // for more context. 90 // TODO(fgorski): Keeping it as a callback with intention to add meaningful 91 // error information. 92 using ErrorCallback = base::RepeatingClosure; 93 // Callback when a message is received. 94 using OnMessageReceivedCallback = 95 base::RepeatingCallback<void(const MCSMessage& message)>; 96 // Callback when a message is sent (and receipt has been acknowledged by 97 // the MCS endpoint). 98 using OnMessageSentCallback = 99 base::RepeatingCallback<void(int64_t user_serial_number, 100 const std::string& app_id, 101 const std::string& message_id, 102 MessageSendStatus status)>; 103 104 MCSClient(const std::string& version_string, 105 base::Clock* clock, 106 ConnectionFactory* connection_factory, 107 GCMStore* gcm_store, 108 scoped_refptr<base::SequencedTaskRunner> io_task_runner, 109 GCMStatsRecorder* recorder); 110 virtual ~MCSClient(); 111 112 // Initialize the client. Will load any previous id/token information as well 113 // as unacknowledged message information from the GCM storage, if it exists, 114 // passing the id/token information back via |initialization_callback| along 115 // with a |success == true| result. If no GCM information is present (and 116 // this is therefore a fresh client), a clean GCM store will be created and 117 // values of 0 will be returned via |initialization_callback| with 118 // |success == true|. 119 // If an error loading the GCM store is encountered, 120 // |initialization_callback| will be invoked with |success == false|. 121 void Initialize(const ErrorCallback& initialization_callback, 122 const OnMessageReceivedCallback& message_received_callback, 123 const OnMessageSentCallback& message_sent_callback, 124 std::unique_ptr<GCMStore::LoadResult> load_result); 125 126 // Logs the client into the server. Client must be initialized. 127 // |android_id| and |security_token| are optional if this is not a new 128 // client, else they must be non-zero. 129 // Successful login will result in |message_received_callback| being invoked 130 // with a valid LoginResponse. 131 // Login failure (typically invalid id/token) will shut down the client, and 132 // |initialization_callback| to be invoked with |success = false|. 133 virtual void Login(uint64_t android_id, uint64_t security_token); 134 135 // Sends a message, with or without reliable message queueing (RMQ) support. 136 // Will asynchronously invoke the OnMessageSent callback regardless. 137 // Whether to use RMQ depends on whether the protobuf has |ttl| set or not. 138 // |ttl == 0| denotes the message should only be sent if the connection is 139 // open. |ttl > 0| will keep the message saved for |ttl| seconds, after which 140 // it will be dropped if it was unable to be sent. When a message is dropped, 141 // |message_sent_callback_| is invoked with a TTL expiration error. 142 virtual void SendMessage(const MCSMessage& message); 143 144 // Returns the current state of the client. state()145 State state() const { return state_; } 146 147 // Returns the size of the send message queue. 148 int GetSendQueueSize() const; 149 150 // Returns the size of the resend messaage queue. 151 int GetResendQueueSize() const; 152 153 // Returns text representation of the state enum. 154 std::string GetStateString() const; 155 156 // Updates the timer used by |heartbeat_manager_| for sending heartbeats. 157 void UpdateHeartbeatTimer(std::unique_ptr<base::RetainingOneShotTimer> timer); 158 159 // Allows a caller to set a heartbeat interval (in milliseconds) with which 160 // the MCS connection will be monitored on both ends, to detect device 161 // presence. In case the newly set interval is smaller than the current one, 162 // connection will be restarted with new heartbeat interval. Valid values have 163 // to be between GetMax/GetMinClientHeartbeatIntervalMs of HeartbeatManager, 164 // otherwise the setting won't take effect. 165 void AddHeartbeatInterval(const std::string& scope, int interval_ms); 166 void RemoveHeartbeatInterval(const std::string& scope); 167 GetHeartbeatManagerForTesting()168 HeartbeatManager* GetHeartbeatManagerForTesting() { 169 return &heartbeat_manager_; 170 } 171 172 private: 173 using StreamId = uint32_t; 174 using PersistentId = std::string; 175 using StreamIdList = std::vector<StreamId>; 176 using PersistentIdList = std::vector<PersistentId>; 177 using StreamIdToPersistentIdMap = std::map<StreamId, PersistentId>; 178 using MCSPacketInternal = std::unique_ptr<ReliablePacketInfo>; 179 180 // Resets the internal state and builds a new login request, acknowledging 181 // any pending server-to-device messages and rebuilding the send queue 182 // from all unacknowledged device-to-server messages. 183 // Should only be called when the connection has been reset. 184 void ResetStateAndBuildLoginRequest(mcs_proto::LoginRequest* request); 185 186 // Send a heartbeat to the MCS server. 187 void SendHeartbeat(); 188 189 // GCM Store callback. 190 void OnGCMUpdateFinished(bool success); 191 192 // Attempt to send a message. 193 void MaybeSendMessage(); 194 195 // Helper for sending a protobuf along with any unacknowledged ids to the 196 // wire. 197 void SendPacketToWire(ReliablePacketInfo* packet_info); 198 199 // Handle a data message sent to the MCS client system from the MCS server. 200 void HandleMCSDataMesssage( 201 std::unique_ptr<google::protobuf::MessageLite> protobuf); 202 203 // Handle a packet received over the wire. 204 void HandlePacketFromWire( 205 std::unique_ptr<google::protobuf::MessageLite> protobuf); 206 207 // ReliableMessageQueue acknowledgment helpers. 208 // Handle a StreamAck sent by the server confirming receipt of all 209 // messages up to the message with stream id |last_stream_id_received|. 210 void HandleStreamAck(StreamId last_stream_id_received_); 211 // Handle a SelectiveAck sent by the server confirming all messages 212 // in |id_list|. 213 void HandleSelectiveAck(const PersistentIdList& id_list); 214 // Handle server confirmation of a device message, including device's 215 // acknowledgment of receipt of messages. 216 void HandleServerConfirmedReceipt(StreamId device_stream_id); 217 218 // Generates a new persistent id for messages. 219 // Virtual for testing. 220 virtual PersistentId GetNextPersistentId(); 221 222 // Helper for the heartbeat manager to signal a connection reset. 223 void OnConnectionResetByHeartbeat( 224 ConnectionFactory::ConnectionResetReason reason); 225 226 // Runs the message_sent_callback_ with send |status| of the |protobuf|. 227 void NotifyMessageSendStatus(const google::protobuf::MessageLite& protobuf, 228 MessageSendStatus status); 229 230 // Pops the next message from the front of the send queue (cleaning up 231 // any associated state). 232 MCSPacketInternal PopMessageForSend(); 233 234 // Gets the minimum interval from the map of scopes to intervals in 235 // milliseconds. 236 int GetMinHeartbeatIntervalMs(); 237 238 // Local version string. Sent on login. 239 const std::string version_string_; 240 241 // Clock for enforcing TTL. Passed in for testing. 242 base::Clock* const clock_; 243 244 // Client state. 245 State state_; 246 247 // Callbacks for owner. 248 ErrorCallback mcs_error_callback_; 249 OnMessageReceivedCallback message_received_callback_; 250 OnMessageSentCallback message_sent_callback_; 251 252 // The android id and security token in use by this device. 253 uint64_t android_id_; 254 uint64_t security_token_; 255 256 // Factory for creating new connections and connection handlers. 257 ConnectionFactory* connection_factory_; 258 259 // Connection handler to handle all over-the-wire protocol communication 260 // with the mobile connection server. 261 ConnectionHandler* connection_handler_; 262 263 // ----- Reliablie Message Queue section ----- 264 // Note: all queues/maps are ordered from oldest (front/begin) message to 265 // most recent (back/end). 266 267 // Send/acknowledge queues. 268 base::circular_deque<MCSPacketInternal> to_send_; 269 base::circular_deque<MCSPacketInternal> to_resend_; 270 271 // Map of collapse keys to their pending messages. 272 std::map<CollapseKey, ReliablePacketInfo*> collapse_key_map_; 273 274 // Last device_to_server stream id acknowledged by the server. 275 StreamId last_device_to_server_stream_id_received_; 276 // Last server_to_device stream id acknowledged by this device. 277 StreamId last_server_to_device_stream_id_received_; 278 // The stream id for the last sent message. A new message should consume 279 // stream_id_out_ + 1. 280 StreamId stream_id_out_; 281 // The stream id of the last received message. The LoginResponse will always 282 // have a stream id of 1, and stream ids increment by 1 for each received 283 // message. 284 StreamId stream_id_in_; 285 286 // The server messages that have not been acked by the device yet. Keyed by 287 // server stream id. 288 StreamIdToPersistentIdMap unacked_server_ids_; 289 290 // Those server messages that have been acked. They must remain tracked 291 // until the ack message is itself confirmed. The list of all message ids 292 // acknowledged are keyed off the device stream id of the message that 293 // acknowledged them. 294 std::map<StreamId, PersistentIdList> acked_server_ids_; 295 296 // Those server messages from a previous connection that were not fully 297 // acknowledged. They do not have associated stream ids, and will be 298 // acknowledged on the next login attempt. 299 PersistentIdList restored_unackeds_server_ids_; 300 301 // The GCM persistent store. Not owned. 302 GCMStore* gcm_store_; 303 304 const scoped_refptr<base::SequencedTaskRunner> io_task_runner_; 305 306 // Manager to handle triggering/detecting heartbeats. 307 HeartbeatManager heartbeat_manager_; 308 309 // Custom heartbeat intervals requested by different components. 310 std::map<std::string, int> custom_heartbeat_intervals_; 311 312 // Recorder that records GCM activities for debugging purpose. Not owned. 313 GCMStatsRecorder* recorder_; 314 315 base::WeakPtrFactory<MCSClient> weak_ptr_factory_{this}; 316 317 DISALLOW_COPY_AND_ASSIGN(MCSClient); 318 }; 319 320 } // namespace gcm 321 322 #endif // GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_ 323