1 #pragma once
2 /**
3  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
4  * SPDX-License-Identifier: Apache-2.0.
5  */
6 #include <aws/crt/Exports.h>
7 #include <aws/crt/StlAllocator.h>
8 #include <aws/crt/Types.h>
9 #include <aws/crt/http/HttpConnection.h>
10 #include <aws/crt/io/SocketOptions.h>
11 #include <aws/crt/io/TlsOptions.h>
12 
13 #include <aws/mqtt/client.h>
14 
15 #include <atomic>
16 #include <functional>
17 #include <memory>
18 
19 namespace Aws
20 {
21     namespace Crt
22     {
23         namespace Io
24         {
25             class ClientBootstrap;
26         }
27 
28         namespace Http
29         {
30             class HttpRequest;
31         }
32 
33         namespace Mqtt
34         {
35             class MqttClient;
36             class MqttConnection;
37 
38             /**
39              * Invoked Upon Connection loss.
40              */
41             using OnConnectionInterruptedHandler = std::function<void(MqttConnection &connection, int error)>;
42 
43             /**
44              * Invoked Upon Connection resumed.
45              */
46             using OnConnectionResumedHandler =
47                 std::function<void(MqttConnection &connection, ReturnCode connectCode, bool sessionPresent)>;
48 
49             /**
50              * Invoked when a connack message is received, or an error occurred.
51              */
52             using OnConnectionCompletedHandler = std::function<
53                 void(MqttConnection &connection, int errorCode, ReturnCode returnCode, bool sessionPresent)>;
54 
55             /**
56              * Invoked when a suback message is received.
57              */
58             using OnSubAckHandler = std::function<
59                 void(MqttConnection &connection, uint16_t packetId, const String &topic, QOS qos, int errorCode)>;
60 
61             /**
62              * Invoked when a suback message for multiple topics is received.
63              */
64             using OnMultiSubAckHandler = std::function<void(
65                 MqttConnection &connection,
66                 uint16_t packetId,
67                 const Vector<String> &topics,
68                 QOS qos,
69                 int errorCode)>;
70 
71             /**
72              * Invoked when a disconnect message has been sent.
73              */
74             using OnDisconnectHandler = std::function<void(MqttConnection &connection)>;
75 
76             /**
77              * Invoked upon receipt of a Publish message on a subscribed topic.
78              * @param connection    The connection object
79              * @param topic         The information channel to which the payload data was published.
80              * @param payload       The payload data.
81              * @param dup           DUP flag. If true, this might be re-delivery of an earlier
82              *                      attempt to send the message.
83              * @param qos           Quality of Service used to deliver the message.
84              * @param retain        Retain flag. If true, the message was sent as a result of
85              *                      a new subscription being made by the client.
86              */
87             using OnMessageReceivedHandler = std::function<void(
88                 MqttConnection &connection,
89                 const String &topic,
90                 const ByteBuf &payload,
91                 bool dup,
92                 QOS qos,
93                 bool retain)>;
94 
95             /**
96              * @deprecated Use OnMessageReceivedHandler
97              */
98             using OnPublishReceivedHandler =
99                 std::function<void(MqttConnection &connection, const String &topic, const ByteBuf &payload)>;
100 
101             using OnOperationCompleteHandler =
102                 std::function<void(MqttConnection &connection, uint16_t packetId, int errorCode)>;
103 
104             /**
105              * Callback for users to invoke upon completion of, presumably asynchronous, OnWebSocketHandshakeIntercept
106              * callback's initiated process.
107              */
108             using OnWebSocketHandshakeInterceptComplete =
109                 std::function<void(const std::shared_ptr<Http::HttpRequest> &, int errorCode)>;
110 
111             /**
112              * Invoked during websocket handshake to give users opportunity to transform an http request for purposes
113              * such as signing/authorization etc... Returning from this function does not continue the websocket
114              * handshake since some work flows may be asynchronous. To accommodate that, onComplete must be invoked upon
115              * completion of the signing process.
116              */
117             using OnWebSocketHandshakeIntercept = std::function<
118                 void(std::shared_ptr<Http::HttpRequest> req, const OnWebSocketHandshakeInterceptComplete &onComplete)>;
119 
120             /**
121              * Represents a persistent Mqtt Connection. The memory is owned by MqttClient.
122              * To get a new instance of this class, see MqttClient::NewConnection. Unless
123              * specified all function arguments need only to live through the duration of the
124              * function call.
125              */
126             class AWS_CRT_CPP_API MqttConnection final
127             {
128                 friend class MqttClient;
129 
130               public:
131                 ~MqttConnection();
132                 MqttConnection(const MqttConnection &) = delete;
133                 MqttConnection(MqttConnection &&) = delete;
134                 MqttConnection &operator=(const MqttConnection &) = delete;
135                 MqttConnection &operator=(MqttConnection &&) = delete;
136                 /**
137                  * @return true if the instance is in a valid state, false otherwise.
138                  */
139                 operator bool() const noexcept;
140                 /**
141                  * @return the value of the last aws error encountered by operations on this instance.
142                  */
143                 int LastError() const noexcept;
144 
145                 /**
146                  * Sets LastWill for the connection. The memory backing payload must outlive the connection.
147                  */
148                 bool SetWill(const char *topic, QOS qos, bool retain, const ByteBuf &payload) noexcept;
149 
150                 /**
151                  * Sets login credentials for the connection. The must get set before the Connect call
152                  * if it is to be used.
153                  */
154                 bool SetLogin(const char *userName, const char *password) noexcept;
155 
156                 /**
157                  * @deprecated Sets websocket proxy options. Replaced by SetHttpProxyOptions.
158                  */
159                 bool SetWebsocketProxyOptions(const Http::HttpClientConnectionProxyOptions &proxyOptions) noexcept;
160 
161                 /**
162                  * Sets http proxy options. In order to use an http proxy with mqtt either
163                  *   (1) Websockets are used
164                  *   (2) Mqtt-over-tls is used and the ALPN list of the tls context contains a tag that resolves to mqtt
165                  */
166                 bool SetHttpProxyOptions(const Http::HttpClientConnectionProxyOptions &proxyOptions) noexcept;
167 
168                 /**
169                  * Customize time to wait between reconnect attempts.
170                  * The time will start at min and multiply by 2 until max is reached.
171                  * The time resets back to min after a successful connection.
172                  * This function should only be called before Connect().
173                  */
174                 bool SetReconnectTimeout(uint64_t min_seconds, uint64_t max_seconds) noexcept;
175 
176                 /**
177                  * Initiates the connection, OnConnectionCompleted will
178                  * be invoked in an event-loop thread.
179                  */
180                 bool Connect(
181                     const char *clientId,
182                     bool cleanSession,
183                     uint16_t keepAliveTimeSecs = 0,
184                     uint32_t pingTimeoutMs = 0,
185                     uint32_t protocolOperationTimeoutMs = 0) noexcept;
186 
187                 /**
188                  * Initiates disconnect, OnDisconnectHandler will be invoked in an event-loop thread.
189                  */
190                 bool Disconnect() noexcept;
191 
192                 /**
193                  * @return the pointer to the underlying mqtt connection
194                  */
195                 aws_mqtt_client_connection *GetUnderlyingConnection() noexcept;
196 
197                 /**
198                  * Subscribes to topicFilter. OnMessageReceivedHandler will be invoked from an event-loop
199                  * thread upon an incoming Publish message. OnSubAckHandler will be invoked
200                  * upon receipt of a suback message.
201                  */
202                 uint16_t Subscribe(
203                     const char *topicFilter,
204                     QOS qos,
205                     OnMessageReceivedHandler &&onMessage,
206                     OnSubAckHandler &&onSubAck) noexcept;
207 
208                 /**
209                  * @deprecated Use alternate Subscribe()
210                  */
211                 uint16_t Subscribe(
212                     const char *topicFilter,
213                     QOS qos,
214                     OnPublishReceivedHandler &&onPublish,
215                     OnSubAckHandler &&onSubAck) noexcept;
216 
217                 /**
218                  * Subscribes to multiple topicFilters. OnMessageReceivedHandler will be invoked from an event-loop
219                  * thread upon an incoming Publish message. OnMultiSubAckHandler will be invoked
220                  * upon receipt of a suback message.
221                  */
222                 uint16_t Subscribe(
223                     const Vector<std::pair<const char *, OnMessageReceivedHandler>> &topicFilters,
224                     QOS qos,
225                     OnMultiSubAckHandler &&onOpComplete) noexcept;
226 
227                 /**
228                  * @deprecated Use alternate Subscribe()
229                  */
230                 uint16_t Subscribe(
231                     const Vector<std::pair<const char *, OnPublishReceivedHandler>> &topicFilters,
232                     QOS qos,
233                     OnMultiSubAckHandler &&onOpComplete) noexcept;
234 
235                 /**
236                  * Installs a handler for all incoming publish messages, regardless of if Subscribe has been
237                  * called on the topic.
238                  */
239                 bool SetOnMessageHandler(OnMessageReceivedHandler &&onMessage) noexcept;
240 
241                 /**
242                  * @deprecated Use alternate SetOnMessageHandler()
243                  */
244                 bool SetOnMessageHandler(OnPublishReceivedHandler &&onPublish) noexcept;
245 
246                 /**
247                  * Unsubscribes from topicFilter. OnOperationCompleteHandler will be invoked upon receipt of
248                  * an unsuback message.
249                  */
250                 uint16_t Unsubscribe(const char *topicFilter, OnOperationCompleteHandler &&onOpComplete) noexcept;
251 
252                 /**
253                  * Publishes to topic. The backing memory for payload must stay available until the
254                  * OnOperationCompleteHandler has been invoked.
255                  */
256                 uint16_t Publish(
257                     const char *topic,
258                     QOS qos,
259                     bool retain,
260                     const ByteBuf &payload,
261                     OnOperationCompleteHandler &&onOpComplete) noexcept;
262 
263                 OnConnectionInterruptedHandler OnConnectionInterrupted;
264                 OnConnectionResumedHandler OnConnectionResumed;
265                 OnConnectionCompletedHandler OnConnectionCompleted;
266                 OnDisconnectHandler OnDisconnect;
267                 OnWebSocketHandshakeIntercept WebsocketInterceptor;
268 
269               private:
270                 aws_mqtt_client *m_owningClient;
271                 aws_mqtt_client_connection *m_underlyingConnection;
272                 String m_hostName;
273                 uint16_t m_port;
274                 Crt::Io::TlsContext m_tlsContext;
275                 Io::TlsConnectionOptions m_tlsOptions;
276                 Io::SocketOptions m_socketOptions;
277                 Crt::Optional<Http::HttpClientConnectionProxyOptions> m_proxyOptions;
278                 void *m_onAnyCbData;
279                 bool m_useTls;
280                 bool m_useWebsocket;
281 
282                 MqttConnection(
283                     aws_mqtt_client *client,
284                     const char *hostName,
285                     uint16_t port,
286                     const Io::SocketOptions &socketOptions,
287                     const Crt::Io::TlsContext &tlsContext,
288                     bool useWebsocket) noexcept;
289 
290                 MqttConnection(
291                     aws_mqtt_client *client,
292                     const char *hostName,
293                     uint16_t port,
294                     const Io::SocketOptions &socketOptions,
295                     bool useWebsocket) noexcept;
296 
297                 static void s_onConnectionInterrupted(aws_mqtt_client_connection *, int errorCode, void *userData);
298                 static void s_onConnectionCompleted(
299                     aws_mqtt_client_connection *,
300                     int errorCode,
301                     enum aws_mqtt_connect_return_code returnCode,
302                     bool sessionPresent,
303                     void *userData);
304                 static void s_onConnectionResumed(
305                     aws_mqtt_client_connection *,
306                     ReturnCode returnCode,
307                     bool sessionPresent,
308                     void *userData);
309 
310                 static void s_onDisconnect(aws_mqtt_client_connection *connection, void *userData);
311                 static void s_onPublish(
312                     aws_mqtt_client_connection *connection,
313                     const aws_byte_cursor *topic,
314                     const aws_byte_cursor *payload,
315                     bool dup,
316                     enum aws_mqtt_qos qos,
317                     bool retain,
318                     void *user_data);
319 
320                 static void s_onSubAck(
321                     aws_mqtt_client_connection *connection,
322                     uint16_t packetId,
323                     const struct aws_byte_cursor *topic,
324                     enum aws_mqtt_qos qos,
325                     int error_code,
326                     void *userdata);
327                 static void s_onMultiSubAck(
328                     aws_mqtt_client_connection *connection,
329                     uint16_t packetId,
330                     const struct aws_array_list *topic_subacks,
331                     int error_code,
332                     void *userdata);
333                 static void s_onOpComplete(
334                     aws_mqtt_client_connection *connection,
335                     uint16_t packetId,
336                     int errorCode,
337                     void *userdata);
338 
339                 static void s_onWebsocketHandshake(
340                     struct aws_http_message *request,
341                     void *user_data,
342                     aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn,
343                     void *complete_ctx);
344 
345                 static void s_connectionInit(
346                     MqttConnection *self,
347                     const char *hostName,
348                     uint16_t port,
349                     const Io::SocketOptions &socketOptions);
350             };
351 
352             /**
353              * An MQTT client. This is a move-only type. Unless otherwise specified,
354              * all function arguments need only to live through the duration of the
355              * function call.
356              */
357             class AWS_CRT_CPP_API MqttClient final
358             {
359               public:
360                 /**
361                  * Initialize an MqttClient using bootstrap and allocator
362                  */
363                 MqttClient(Io::ClientBootstrap &bootstrap, Allocator *allocator = g_allocator) noexcept;
364 
365                 ~MqttClient();
366                 MqttClient(const MqttClient &) = delete;
367                 MqttClient(MqttClient &&) noexcept;
368                 MqttClient &operator=(const MqttClient &) = delete;
369                 MqttClient &operator=(MqttClient &&) noexcept;
370                 /**
371                  * @return true if the instance is in a valid state, false otherwise.
372                  */
373                 operator bool() const noexcept;
374                 /**
375                  * @return the value of the last aws error encountered by operations on this instance.
376                  */
377                 int LastError() const noexcept;
378 
379                 /**
380                  * Create a new connection object using TLS from the client. The client must outlive
381                  * all of its connection instances.
382                  */
383                 std::shared_ptr<MqttConnection> NewConnection(
384                     const char *hostName,
385                     uint16_t port,
386                     const Io::SocketOptions &socketOptions,
387                     const Crt::Io::TlsContext &tlsContext,
388                     bool useWebsocket = false) noexcept;
389                 /**
390                  * Create a new connection object over plain text from the client. The client must outlive
391                  * all of its connection instances.
392                  */
393                 std::shared_ptr<MqttConnection> NewConnection(
394                     const char *hostName,
395                     uint16_t port,
396                     const Io::SocketOptions &socketOptions,
397                     bool useWebsocket = false) noexcept;
398 
399               private:
400                 aws_mqtt_client *m_client;
401             };
402         } // namespace Mqtt
403     }     // namespace Crt
404 } // namespace Aws
405