1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_
6 #define GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_
7 
8 #include <stdint.h>
9 
10 #include <map>
11 #include <memory>
12 #include <string>
13 #include <vector>
14 
15 #include "base/containers/circular_deque.h"
16 #include "base/files/file_path.h"
17 #include "base/macros.h"
18 #include "base/memory/weak_ptr.h"
19 #include "google_apis/gcm/base/gcm_export.h"
20 #include "google_apis/gcm/base/mcs_message.h"
21 #include "google_apis/gcm/engine/connection_factory.h"
22 #include "google_apis/gcm/engine/connection_handler.h"
23 #include "google_apis/gcm/engine/gcm_store.h"
24 #include "google_apis/gcm/engine/heartbeat_manager.h"
25 
26 namespace base {
27 class Clock;
28 class RetainingOneShotTimer;
29 }  // namespace base
30 
31 namespace google {
32 namespace protobuf {
33 class MessageLite;
34 }  // namespace protobuf
35 }  // namespace google
36 
37 namespace mcs_proto {
38 class LoginRequest;
39 }
40 
41 namespace gcm {
42 
43 class CollapseKey;
44 class ConnectionFactory;
45 class GCMStatsRecorder;
46 struct ReliablePacketInfo;
47 
48 // An MCS client. This client is in charge of all communications with an
49 // MCS endpoint, and is capable of reliably sending/receiving GCM messages.
50 // NOTE: Not thread safe. This class should live on the same thread as that
51 // network requests are performed on.
52 class GCM_EXPORT MCSClient {
53  public:
54   // Any change made to this enum should have corresponding change in the
55   // GetStateString(...) function.
56   enum State {
57     UNINITIALIZED,  // Uninitialized.
58     LOADED,         // GCM Load finished, waiting to connect.
59     CONNECTING,     // Connection in progress.
60     CONNECTED,      // Connected and running.
61   };
62 
63   // Any change made to this enum should have corresponding change in the
64   // GetMessageSendStatusString(...) function in mcs_client.cc.
65   enum MessageSendStatus {
66     // Message was queued successfully.
67     QUEUED,
68     // Message was sent to the server and the ACK was received.
69     SENT,
70     // Message not saved, because total queue size limit reached.
71     QUEUE_SIZE_LIMIT_REACHED,
72     // Message not saved, because app queue size limit reached.
73     APP_QUEUE_SIZE_LIMIT_REACHED,
74     // Message too large to send.
75     MESSAGE_TOO_LARGE,
76     // Message not send becuase of TTL = 0 and no working connection.
77     NO_CONNECTION_ON_ZERO_TTL,
78     // Message exceeded TTL.
79     TTL_EXCEEDED,
80 
81     // NOTE: always keep this entry at the end. Add new status types only
82     // immediately above this line. Make sure to update the corresponding
83     // histogram enum accordingly.
84     SEND_STATUS_COUNT
85   };
86 
87   // Callback for MCSClient's error conditions. A repeating callback is used
88   // because occasionally multiple errors are reported, see crbug.com/1039598
89   // for more context.
90   // TODO(fgorski): Keeping it as a callback with intention to add meaningful
91   // error information.
92   using ErrorCallback = base::RepeatingClosure;
93   // Callback when a message is received.
94   using OnMessageReceivedCallback =
95       base::RepeatingCallback<void(const MCSMessage& message)>;
96   // Callback when a message is sent (and receipt has been acknowledged by
97   // the MCS endpoint).
98   using OnMessageSentCallback =
99       base::RepeatingCallback<void(int64_t user_serial_number,
100                                    const std::string& app_id,
101                                    const std::string& message_id,
102                                    MessageSendStatus status)>;
103 
104   MCSClient(const std::string& version_string,
105             base::Clock* clock,
106             ConnectionFactory* connection_factory,
107             GCMStore* gcm_store,
108             scoped_refptr<base::SequencedTaskRunner> io_task_runner,
109             GCMStatsRecorder* recorder);
110   virtual ~MCSClient();
111 
112   // Initialize the client. Will load any previous id/token information as well
113   // as unacknowledged message information from the GCM storage, if it exists,
114   // passing the id/token information back via |initialization_callback| along
115   // with a |success == true| result. If no GCM information is present (and
116   // this is therefore a fresh client), a clean GCM store will be created and
117   // values of 0 will be returned via |initialization_callback| with
118   // |success == true|.
119   // If an error loading the GCM store is encountered,
120   // |initialization_callback| will be invoked with |success == false|.
121   void Initialize(const ErrorCallback& initialization_callback,
122                   const OnMessageReceivedCallback& message_received_callback,
123                   const OnMessageSentCallback& message_sent_callback,
124                   std::unique_ptr<GCMStore::LoadResult> load_result);
125 
126   // Logs the client into the server. Client must be initialized.
127   // |android_id| and |security_token| are optional if this is not a new
128   // client, else they must be non-zero.
129   // Successful login will result in |message_received_callback| being invoked
130   // with a valid LoginResponse.
131   // Login failure (typically invalid id/token) will shut down the client, and
132   // |initialization_callback| to be invoked with |success = false|.
133   virtual void Login(uint64_t android_id, uint64_t security_token);
134 
135   // Sends a message, with or without reliable message queueing (RMQ) support.
136   // Will asynchronously invoke the OnMessageSent callback regardless.
137   // Whether to use RMQ depends on whether the protobuf has |ttl| set or not.
138   // |ttl == 0| denotes the message should only be sent if the connection is
139   // open. |ttl > 0| will keep the message saved for |ttl| seconds, after which
140   // it will be dropped if it was unable to be sent. When a message is dropped,
141   // |message_sent_callback_| is invoked with a TTL expiration error.
142   virtual void SendMessage(const MCSMessage& message);
143 
144   // Returns the current state of the client.
state()145   State state() const { return state_; }
146 
147   // Returns the size of the send message queue.
148   int GetSendQueueSize() const;
149 
150   // Returns the size of the resend messaage queue.
151   int GetResendQueueSize() const;
152 
153   // Returns text representation of the state enum.
154   std::string GetStateString() const;
155 
156   // Updates the timer used by |heartbeat_manager_| for sending heartbeats.
157   void UpdateHeartbeatTimer(std::unique_ptr<base::RetainingOneShotTimer> timer);
158 
159   // Allows a caller to set a heartbeat interval (in milliseconds) with which
160   // the MCS connection will be monitored on both ends, to detect device
161   // presence. In case the newly set interval is smaller than the current one,
162   // connection will be restarted with new heartbeat interval. Valid values have
163   // to be between GetMax/GetMinClientHeartbeatIntervalMs of HeartbeatManager,
164   // otherwise the setting won't take effect.
165   void AddHeartbeatInterval(const std::string& scope, int interval_ms);
166   void RemoveHeartbeatInterval(const std::string& scope);
167 
GetHeartbeatManagerForTesting()168   HeartbeatManager* GetHeartbeatManagerForTesting() {
169     return &heartbeat_manager_;
170   }
171 
172  private:
173   using StreamId = uint32_t;
174   using PersistentId = std::string;
175   using StreamIdList = std::vector<StreamId>;
176   using PersistentIdList = std::vector<PersistentId>;
177   using StreamIdToPersistentIdMap = std::map<StreamId, PersistentId>;
178   using MCSPacketInternal = std::unique_ptr<ReliablePacketInfo>;
179 
180   // Resets the internal state and builds a new login request, acknowledging
181   // any pending server-to-device messages and rebuilding the send queue
182   // from all unacknowledged device-to-server messages.
183   // Should only be called when the connection has been reset.
184   void ResetStateAndBuildLoginRequest(mcs_proto::LoginRequest* request);
185 
186   // Send a heartbeat to the MCS server.
187   void SendHeartbeat();
188 
189   // GCM Store callback.
190   void OnGCMUpdateFinished(bool success);
191 
192   // Attempt to send a message.
193   void MaybeSendMessage();
194 
195   // Helper for sending a protobuf along with any unacknowledged ids to the
196   // wire.
197   void SendPacketToWire(ReliablePacketInfo* packet_info);
198 
199   // Handle a data message sent to the MCS client system from the MCS server.
200   void HandleMCSDataMesssage(
201       std::unique_ptr<google::protobuf::MessageLite> protobuf);
202 
203   // Handle a packet received over the wire.
204   void HandlePacketFromWire(
205       std::unique_ptr<google::protobuf::MessageLite> protobuf);
206 
207   // ReliableMessageQueue acknowledgment helpers.
208   // Handle a StreamAck sent by the server confirming receipt of all
209   // messages up to the message with stream id |last_stream_id_received|.
210   void HandleStreamAck(StreamId last_stream_id_received_);
211   // Handle a SelectiveAck sent by the server confirming all messages
212   // in |id_list|.
213   void HandleSelectiveAck(const PersistentIdList& id_list);
214   // Handle server confirmation of a device message, including device's
215   // acknowledgment of receipt of messages.
216   void HandleServerConfirmedReceipt(StreamId device_stream_id);
217 
218   // Generates a new persistent id for messages.
219   // Virtual for testing.
220   virtual PersistentId GetNextPersistentId();
221 
222   // Helper for the heartbeat manager to signal a connection reset.
223   void OnConnectionResetByHeartbeat(
224       ConnectionFactory::ConnectionResetReason reason);
225 
226   // Runs the message_sent_callback_ with send |status| of the |protobuf|.
227   void NotifyMessageSendStatus(const google::protobuf::MessageLite& protobuf,
228                                MessageSendStatus status);
229 
230   // Pops the next message from the front of the send queue (cleaning up
231   // any associated state).
232   MCSPacketInternal PopMessageForSend();
233 
234   // Gets the minimum interval from the map of scopes to intervals in
235   // milliseconds.
236   int GetMinHeartbeatIntervalMs();
237 
238   // Local version string. Sent on login.
239   const std::string version_string_;
240 
241   // Clock for enforcing TTL. Passed in for testing.
242   base::Clock* const clock_;
243 
244   // Client state.
245   State state_;
246 
247   // Callbacks for owner.
248   ErrorCallback mcs_error_callback_;
249   OnMessageReceivedCallback message_received_callback_;
250   OnMessageSentCallback message_sent_callback_;
251 
252   // The android id and security token in use by this device.
253   uint64_t android_id_;
254   uint64_t security_token_;
255 
256   // Factory for creating new connections and connection handlers.
257   ConnectionFactory* connection_factory_;
258 
259   // Connection handler to handle all over-the-wire protocol communication
260   // with the mobile connection server.
261   ConnectionHandler* connection_handler_;
262 
263   // -----  Reliablie Message Queue section -----
264   // Note: all queues/maps are ordered from oldest (front/begin) message to
265   // most recent (back/end).
266 
267   // Send/acknowledge queues.
268   base::circular_deque<MCSPacketInternal> to_send_;
269   base::circular_deque<MCSPacketInternal> to_resend_;
270 
271   // Map of collapse keys to their pending messages.
272   std::map<CollapseKey, ReliablePacketInfo*> collapse_key_map_;
273 
274   // Last device_to_server stream id acknowledged by the server.
275   StreamId last_device_to_server_stream_id_received_;
276   // Last server_to_device stream id acknowledged by this device.
277   StreamId last_server_to_device_stream_id_received_;
278   // The stream id for the last sent message. A new message should consume
279   // stream_id_out_ + 1.
280   StreamId stream_id_out_;
281   // The stream id of the last received message. The LoginResponse will always
282   // have a stream id of 1, and stream ids increment by 1 for each received
283   // message.
284   StreamId stream_id_in_;
285 
286   // The server messages that have not been acked by the device yet. Keyed by
287   // server stream id.
288   StreamIdToPersistentIdMap unacked_server_ids_;
289 
290   // Those server messages that have been acked. They must remain tracked
291   // until the ack message is itself confirmed. The list of all message ids
292   // acknowledged are keyed off the device stream id of the message that
293   // acknowledged them.
294   std::map<StreamId, PersistentIdList> acked_server_ids_;
295 
296   // Those server messages from a previous connection that were not fully
297   // acknowledged. They do not have associated stream ids, and will be
298   // acknowledged on the next login attempt.
299   PersistentIdList restored_unackeds_server_ids_;
300 
301   // The GCM persistent store. Not owned.
302   GCMStore* gcm_store_;
303 
304   const scoped_refptr<base::SequencedTaskRunner> io_task_runner_;
305 
306   // Manager to handle triggering/detecting heartbeats.
307   HeartbeatManager heartbeat_manager_;
308 
309   // Custom heartbeat intervals requested by different components.
310   std::map<std::string, int> custom_heartbeat_intervals_;
311 
312   // Recorder that records GCM activities for debugging purpose. Not owned.
313   GCMStatsRecorder* recorder_;
314 
315   base::WeakPtrFactory<MCSClient> weak_ptr_factory_{this};
316 
317   DISALLOW_COPY_AND_ASSIGN(MCSClient);
318 };
319 
320 } // namespace gcm
321 
322 #endif  // GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_
323