1 #pragma once 2 /** 3 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 4 * SPDX-License-Identifier: Apache-2.0. 5 */ 6 #include <aws/crt/Exports.h> 7 #include <aws/crt/StlAllocator.h> 8 #include <aws/crt/Types.h> 9 #include <aws/crt/http/HttpConnection.h> 10 #include <aws/crt/io/SocketOptions.h> 11 #include <aws/crt/io/TlsOptions.h> 12 13 #include <aws/mqtt/client.h> 14 15 #include <atomic> 16 #include <functional> 17 #include <memory> 18 19 namespace Aws 20 { 21 namespace Crt 22 { 23 namespace Io 24 { 25 class ClientBootstrap; 26 } 27 28 namespace Http 29 { 30 class HttpRequest; 31 } 32 33 namespace Mqtt 34 { 35 class MqttClient; 36 class MqttConnection; 37 38 /** 39 * Invoked Upon Connection loss. 40 */ 41 using OnConnectionInterruptedHandler = std::function<void(MqttConnection &connection, int error)>; 42 43 /** 44 * Invoked Upon Connection resumed. 45 */ 46 using OnConnectionResumedHandler = 47 std::function<void(MqttConnection &connection, ReturnCode connectCode, bool sessionPresent)>; 48 49 /** 50 * Invoked when a connack message is received, or an error occurred. 51 */ 52 using OnConnectionCompletedHandler = std::function< 53 void(MqttConnection &connection, int errorCode, ReturnCode returnCode, bool sessionPresent)>; 54 55 /** 56 * Invoked when a suback message is received. 57 */ 58 using OnSubAckHandler = std::function< 59 void(MqttConnection &connection, uint16_t packetId, const String &topic, QOS qos, int errorCode)>; 60 61 /** 62 * Invoked when a suback message for multiple topics is received. 63 */ 64 using OnMultiSubAckHandler = std::function<void( 65 MqttConnection &connection, 66 uint16_t packetId, 67 const Vector<String> &topics, 68 QOS qos, 69 int errorCode)>; 70 71 /** 72 * Invoked when a disconnect message has been sent. 73 */ 74 using OnDisconnectHandler = std::function<void(MqttConnection &connection)>; 75 76 /** 77 * Invoked upon receipt of a Publish message on a subscribed topic. 78 * @param connection The connection object 79 * @param topic The information channel to which the payload data was published. 80 * @param payload The payload data. 81 * @param dup DUP flag. If true, this might be re-delivery of an earlier 82 * attempt to send the message. 83 * @param qos Quality of Service used to deliver the message. 84 * @param retain Retain flag. If true, the message was sent as a result of 85 * a new subscription being made by the client. 86 */ 87 using OnMessageReceivedHandler = std::function<void( 88 MqttConnection &connection, 89 const String &topic, 90 const ByteBuf &payload, 91 bool dup, 92 QOS qos, 93 bool retain)>; 94 95 /** 96 * @deprecated Use OnMessageReceivedHandler 97 */ 98 using OnPublishReceivedHandler = 99 std::function<void(MqttConnection &connection, const String &topic, const ByteBuf &payload)>; 100 101 using OnOperationCompleteHandler = 102 std::function<void(MqttConnection &connection, uint16_t packetId, int errorCode)>; 103 104 /** 105 * Callback for users to invoke upon completion of, presumably asynchronous, OnWebSocketHandshakeIntercept 106 * callback's initiated process. 107 */ 108 using OnWebSocketHandshakeInterceptComplete = 109 std::function<void(const std::shared_ptr<Http::HttpRequest> &, int errorCode)>; 110 111 /** 112 * Invoked during websocket handshake to give users opportunity to transform an http request for purposes 113 * such as signing/authorization etc... Returning from this function does not continue the websocket 114 * handshake since some work flows may be asynchronous. To accommodate that, onComplete must be invoked upon 115 * completion of the signing process. 116 */ 117 using OnWebSocketHandshakeIntercept = std::function< 118 void(std::shared_ptr<Http::HttpRequest> req, const OnWebSocketHandshakeInterceptComplete &onComplete)>; 119 120 /** 121 * Represents a persistent Mqtt Connection. The memory is owned by MqttClient. 122 * To get a new instance of this class, see MqttClient::NewConnection. Unless 123 * specified all function arguments need only to live through the duration of the 124 * function call. 125 */ 126 class AWS_CRT_CPP_API MqttConnection final 127 { 128 friend class MqttClient; 129 130 public: 131 ~MqttConnection(); 132 MqttConnection(const MqttConnection &) = delete; 133 MqttConnection(MqttConnection &&) = delete; 134 MqttConnection &operator=(const MqttConnection &) = delete; 135 MqttConnection &operator=(MqttConnection &&) = delete; 136 /** 137 * @return true if the instance is in a valid state, false otherwise. 138 */ 139 operator bool() const noexcept; 140 /** 141 * @return the value of the last aws error encountered by operations on this instance. 142 */ 143 int LastError() const noexcept; 144 145 /** 146 * Sets LastWill for the connection. The memory backing payload must outlive the connection. 147 */ 148 bool SetWill(const char *topic, QOS qos, bool retain, const ByteBuf &payload) noexcept; 149 150 /** 151 * Sets login credentials for the connection. The must get set before the Connect call 152 * if it is to be used. 153 */ 154 bool SetLogin(const char *userName, const char *password) noexcept; 155 156 /** 157 * @deprecated Sets websocket proxy options. Replaced by SetHttpProxyOptions. 158 */ 159 bool SetWebsocketProxyOptions(const Http::HttpClientConnectionProxyOptions &proxyOptions) noexcept; 160 161 /** 162 * Sets http proxy options. In order to use an http proxy with mqtt either 163 * (1) Websockets are used 164 * (2) Mqtt-over-tls is used and the ALPN list of the tls context contains a tag that resolves to mqtt 165 */ 166 bool SetHttpProxyOptions(const Http::HttpClientConnectionProxyOptions &proxyOptions) noexcept; 167 168 /** 169 * Customize time to wait between reconnect attempts. 170 * The time will start at min and multiply by 2 until max is reached. 171 * The time resets back to min after a successful connection. 172 * This function should only be called before Connect(). 173 */ 174 bool SetReconnectTimeout(uint64_t min_seconds, uint64_t max_seconds) noexcept; 175 176 /** 177 * Initiates the connection, OnConnectionCompleted will 178 * be invoked in an event-loop thread. 179 */ 180 bool Connect( 181 const char *clientId, 182 bool cleanSession, 183 uint16_t keepAliveTimeSecs = 0, 184 uint32_t pingTimeoutMs = 0, 185 uint32_t protocolOperationTimeoutMs = 0) noexcept; 186 187 /** 188 * Initiates disconnect, OnDisconnectHandler will be invoked in an event-loop thread. 189 */ 190 bool Disconnect() noexcept; 191 192 /** 193 * @return the pointer to the underlying mqtt connection 194 */ 195 aws_mqtt_client_connection *GetUnderlyingConnection() noexcept; 196 197 /** 198 * Subscribes to topicFilter. OnMessageReceivedHandler will be invoked from an event-loop 199 * thread upon an incoming Publish message. OnSubAckHandler will be invoked 200 * upon receipt of a suback message. 201 */ 202 uint16_t Subscribe( 203 const char *topicFilter, 204 QOS qos, 205 OnMessageReceivedHandler &&onMessage, 206 OnSubAckHandler &&onSubAck) noexcept; 207 208 /** 209 * @deprecated Use alternate Subscribe() 210 */ 211 uint16_t Subscribe( 212 const char *topicFilter, 213 QOS qos, 214 OnPublishReceivedHandler &&onPublish, 215 OnSubAckHandler &&onSubAck) noexcept; 216 217 /** 218 * Subscribes to multiple topicFilters. OnMessageReceivedHandler will be invoked from an event-loop 219 * thread upon an incoming Publish message. OnMultiSubAckHandler will be invoked 220 * upon receipt of a suback message. 221 */ 222 uint16_t Subscribe( 223 const Vector<std::pair<const char *, OnMessageReceivedHandler>> &topicFilters, 224 QOS qos, 225 OnMultiSubAckHandler &&onOpComplete) noexcept; 226 227 /** 228 * @deprecated Use alternate Subscribe() 229 */ 230 uint16_t Subscribe( 231 const Vector<std::pair<const char *, OnPublishReceivedHandler>> &topicFilters, 232 QOS qos, 233 OnMultiSubAckHandler &&onOpComplete) noexcept; 234 235 /** 236 * Installs a handler for all incoming publish messages, regardless of if Subscribe has been 237 * called on the topic. 238 */ 239 bool SetOnMessageHandler(OnMessageReceivedHandler &&onMessage) noexcept; 240 241 /** 242 * @deprecated Use alternate SetOnMessageHandler() 243 */ 244 bool SetOnMessageHandler(OnPublishReceivedHandler &&onPublish) noexcept; 245 246 /** 247 * Unsubscribes from topicFilter. OnOperationCompleteHandler will be invoked upon receipt of 248 * an unsuback message. 249 */ 250 uint16_t Unsubscribe(const char *topicFilter, OnOperationCompleteHandler &&onOpComplete) noexcept; 251 252 /** 253 * Publishes to topic. The backing memory for payload must stay available until the 254 * OnOperationCompleteHandler has been invoked. 255 */ 256 uint16_t Publish( 257 const char *topic, 258 QOS qos, 259 bool retain, 260 const ByteBuf &payload, 261 OnOperationCompleteHandler &&onOpComplete) noexcept; 262 263 OnConnectionInterruptedHandler OnConnectionInterrupted; 264 OnConnectionResumedHandler OnConnectionResumed; 265 OnConnectionCompletedHandler OnConnectionCompleted; 266 OnDisconnectHandler OnDisconnect; 267 OnWebSocketHandshakeIntercept WebsocketInterceptor; 268 269 private: 270 aws_mqtt_client *m_owningClient; 271 aws_mqtt_client_connection *m_underlyingConnection; 272 String m_hostName; 273 uint16_t m_port; 274 Crt::Io::TlsContext m_tlsContext; 275 Io::TlsConnectionOptions m_tlsOptions; 276 Io::SocketOptions m_socketOptions; 277 Crt::Optional<Http::HttpClientConnectionProxyOptions> m_proxyOptions; 278 void *m_onAnyCbData; 279 bool m_useTls; 280 bool m_useWebsocket; 281 282 MqttConnection( 283 aws_mqtt_client *client, 284 const char *hostName, 285 uint16_t port, 286 const Io::SocketOptions &socketOptions, 287 const Crt::Io::TlsContext &tlsContext, 288 bool useWebsocket) noexcept; 289 290 MqttConnection( 291 aws_mqtt_client *client, 292 const char *hostName, 293 uint16_t port, 294 const Io::SocketOptions &socketOptions, 295 bool useWebsocket) noexcept; 296 297 static void s_onConnectionInterrupted(aws_mqtt_client_connection *, int errorCode, void *userData); 298 static void s_onConnectionCompleted( 299 aws_mqtt_client_connection *, 300 int errorCode, 301 enum aws_mqtt_connect_return_code returnCode, 302 bool sessionPresent, 303 void *userData); 304 static void s_onConnectionResumed( 305 aws_mqtt_client_connection *, 306 ReturnCode returnCode, 307 bool sessionPresent, 308 void *userData); 309 310 static void s_onDisconnect(aws_mqtt_client_connection *connection, void *userData); 311 static void s_onPublish( 312 aws_mqtt_client_connection *connection, 313 const aws_byte_cursor *topic, 314 const aws_byte_cursor *payload, 315 bool dup, 316 enum aws_mqtt_qos qos, 317 bool retain, 318 void *user_data); 319 320 static void s_onSubAck( 321 aws_mqtt_client_connection *connection, 322 uint16_t packetId, 323 const struct aws_byte_cursor *topic, 324 enum aws_mqtt_qos qos, 325 int error_code, 326 void *userdata); 327 static void s_onMultiSubAck( 328 aws_mqtt_client_connection *connection, 329 uint16_t packetId, 330 const struct aws_array_list *topic_subacks, 331 int error_code, 332 void *userdata); 333 static void s_onOpComplete( 334 aws_mqtt_client_connection *connection, 335 uint16_t packetId, 336 int errorCode, 337 void *userdata); 338 339 static void s_onWebsocketHandshake( 340 struct aws_http_message *request, 341 void *user_data, 342 aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn, 343 void *complete_ctx); 344 345 static void s_connectionInit( 346 MqttConnection *self, 347 const char *hostName, 348 uint16_t port, 349 const Io::SocketOptions &socketOptions); 350 }; 351 352 /** 353 * An MQTT client. This is a move-only type. Unless otherwise specified, 354 * all function arguments need only to live through the duration of the 355 * function call. 356 */ 357 class AWS_CRT_CPP_API MqttClient final 358 { 359 public: 360 /** 361 * Initialize an MqttClient using bootstrap and allocator 362 */ 363 MqttClient(Io::ClientBootstrap &bootstrap, Allocator *allocator = g_allocator) noexcept; 364 365 ~MqttClient(); 366 MqttClient(const MqttClient &) = delete; 367 MqttClient(MqttClient &&) noexcept; 368 MqttClient &operator=(const MqttClient &) = delete; 369 MqttClient &operator=(MqttClient &&) noexcept; 370 /** 371 * @return true if the instance is in a valid state, false otherwise. 372 */ 373 operator bool() const noexcept; 374 /** 375 * @return the value of the last aws error encountered by operations on this instance. 376 */ 377 int LastError() const noexcept; 378 379 /** 380 * Create a new connection object using TLS from the client. The client must outlive 381 * all of its connection instances. 382 */ 383 std::shared_ptr<MqttConnection> NewConnection( 384 const char *hostName, 385 uint16_t port, 386 const Io::SocketOptions &socketOptions, 387 const Crt::Io::TlsContext &tlsContext, 388 bool useWebsocket = false) noexcept; 389 /** 390 * Create a new connection object over plain text from the client. The client must outlive 391 * all of its connection instances. 392 */ 393 std::shared_ptr<MqttConnection> NewConnection( 394 const char *hostName, 395 uint16_t port, 396 const Io::SocketOptions &socketOptions, 397 bool useWebsocket = false) noexcept; 398 399 private: 400 aws_mqtt_client *m_client; 401 }; 402 } // namespace Mqtt 403 } // namespace Crt 404 } // namespace Aws 405