1%% The MIT License (MIT) 2%% 3%% Copyright (c) <2013> <hellomatty@gmail.com> 4%% 5%% Permission is hereby granted, free of charge, to any person obtaining a copy 6%% of this software and associated documentation files (the "Software"), to deal 7%% in the Software without restriction, including without limitation the rights 8%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9%% copies of the Software, and to permit persons to whom the Software is 10%% furnished to do so, subject to the following conditions: 11%% 12%% The above copyright notice and this permission notice shall be included in 13%% all copies or substantial portions of the Software. 14%% 15%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21%% THE SOFTWARE. 22 23%% Modified from: https://code.google.com/p/my-mqtt4erl/source/browse/src/mqtt_core.erl. 24-module(mqtt_frame). 25-author('hellomatty@gmail.com'). 26 27%% 28%% An erlang client for MQTT (http://www.mqtt.org/) 29%% 30 31-include_lib("mqtt.hrl"). 32 33-export([encode/1, decode/1]). 34-export([set_connect_options/1, set_publish_options/1, command_for_type/1]). 35 36%%%=================================================================== 37%%% API functions 38%%%=================================================================== 39encode(#mqtt{} = Message) -> 40 {VariableHeader, Payload} = encode_message(Message), 41 FixedHeader = encode_fixed_header(Message), 42 EncodedLength = encode_length(size(VariableHeader) + size(Payload)), 43 <<FixedHeader/binary, EncodedLength/binary, VariableHeader/binary, Payload/binary>>. 44 45decode(<<FixedHeader:8/big, Rest/binary>>) -> 46 case decode_length(Rest) of 47 more -> 48 more; 49 {RemainingLength, Rest1} -> 50 Size = size(Rest1), 51 if 52 Size >= RemainingLength -> 53 <<Body:RemainingLength/binary-unit:8, Left/binary>> = Rest1, 54 {decode_message(decode_fixed_header(<<FixedHeader>>), Body), Left}; 55 true -> more 56 end 57 end; 58decode(_Data) -> 59 more. 60 61set_connect_options(Options) -> 62 set_connect_options(Options, #connect_options{}). 63 64set_publish_options(Options) -> 65 set_publish_options(Options, #publish_options{}). 66 67command_for_type(Type) -> 68 case Type of 69 ?CONNECT -> connect; 70 ?CONNACK -> connack; 71 ?PUBLISH -> publish; 72 ?PUBACK -> puback; 73 ?PUBREC -> pubrec; 74 ?PUBREL -> pubrel; 75 ?PUBCOMP -> pubcomp; 76 ?SUBSCRIBE -> subscribe; 77 ?SUBACK -> suback; 78 ?UNSUBSCRIBE -> unsubscribe; 79 ?UNSUBACK -> unsuback; 80 ?PINGREQ -> pingreq; 81 ?PINGRESP -> pingresp; 82 ?DISCONNECT -> disconnect; 83 _ -> unknown 84 end. 85 86%%%=================================================================== 87%%% Internal functions 88%%%=================================================================== 89set_connect_options([], Options) -> 90 Options; 91set_connect_options([{keepalive, KeepAlive}|T], Options) -> 92 set_connect_options(T, Options#connect_options{keepalive = KeepAlive}); 93set_connect_options([{retry, Retry}|T], Options) -> 94 set_connect_options(T, Options#connect_options{retry = Retry}); 95set_connect_options([{client_id, ClientId}|T], Options) -> 96 set_connect_options(T, Options#connect_options{client_id = ClientId}); 97set_connect_options([{clean_start, Flag}|T], Options) -> 98 set_connect_options(T, Options#connect_options{clean_start = Flag}); 99set_connect_options([{connect_timeout, Timeout}|T], Options) -> 100 set_connect_options(T, Options#connect_options{connect_timeout = Timeout}); 101set_connect_options([{username, UserName}|T], Options) -> 102 set_connect_options(T, Options#connect_options{username = UserName}); 103set_connect_options([{password, Password}|T], Options) -> 104 set_connect_options(T, Options#connect_options{password = Password}); 105set_connect_options([#will{} = Will|T], Options) -> 106 set_connect_options(T, Options#connect_options{will = Will}); 107set_connect_options([UnknownOption|_T], _Options) -> 108 exit({connect, unknown_option, UnknownOption}). 109 110set_publish_options([], Options) -> 111 Options; 112set_publish_options([{qos, QoS}|T], Options) when QoS >= 0, QoS =< 2 -> 113 set_publish_options(T, Options#publish_options{qos = QoS}); 114set_publish_options([{retain, true}|T], Options) -> 115 set_publish_options(T, Options#publish_options{retain = 1}); 116set_publish_options([{retain, false}|T], Options) -> 117 set_publish_options(T, Options#publish_options{retain = 0}); 118set_publish_options([UnknownOption|_T], _Options) -> 119 exit({unknown, publish_option, UnknownOption}). 120 121construct_will(WT, WM, WillQoS, WillRetain) -> 122 #will{ 123 topic = WT, 124 message = WM, 125 publish_options = #publish_options{qos = WillQoS, retain = WillRetain} 126 }. 127 128decode_message(#mqtt{type = ?CONNECT} = Message, Rest) -> 129 <<ProtocolNameLength:16/big, _/binary>> = Rest, 130 {VariableHeader, Payload} = split_binary(Rest, 2 + ProtocolNameLength + 4), 131 <<_:16, ProtocolName:ProtocolNameLength/binary, ProtocolVersion:8/big, UsernameFlag:1, PasswordFlag:1, WillRetain:1, WillQoS:2/big, WillFlag:1, CleanStart:1, _:1, KeepAlive:16/big>> = VariableHeader, 132 {ClientId, Will, Username, Password} = case {WillFlag, UsernameFlag, PasswordFlag} of 133 {1, 0, 0} -> 134 [C, WT, WM] = decode_strings(Payload), 135 W = construct_will(WT, WM, WillQoS, WillRetain), 136 {C, W, undefined, undefined}; 137 {1, 1, 0} -> 138 [C, WT, WM, U] = decode_strings(Payload), 139 W = construct_will(WT, WM, WillQoS, WillRetain), 140 {C, W, U, undefined}; 141 {1, 1, 1} -> 142 [C, WT, WM, U, P] = decode_strings(Payload), 143 W = construct_will(WT, WM, WillQoS, WillRetain), 144 {C, W, U, P}; 145 {0, 1, 0} -> 146 [C, U] = decode_strings(Payload), 147 {C, undefined, U, undefined}; 148 {0, 1, 1} -> 149 [C, U, P] = decode_strings(Payload), 150 {C, undefined, U, P}; 151 {0, 0, 0} -> 152 [C] = decode_strings(Payload), 153 {C, undefined, undefined, undefined} 154 end, 155 Message#mqtt{ 156 arg = #connect_options{ 157 client_id = ClientId, 158 protocol_name = binary_to_list(ProtocolName), 159 protocol_version = ProtocolVersion, 160 clean_start = CleanStart =:= 1, 161 will = Will, 162 username = Username, 163 password = Password, 164 keepalive = KeepAlive 165 } 166 }; 167decode_message(#mqtt{type = ?CONNACK} = Message, Rest) -> 168 <<_:8, ResponseCode:8/big>> = Rest, 169 Message#mqtt{arg = ResponseCode}; 170decode_message(#mqtt{type = ?PINGRESP} = Message, _Rest) -> 171 Message; 172decode_message(#mqtt{type = ?PINGREQ} = Message, _Rest) -> 173 Message; 174decode_message(#mqtt{type = ?PUBLISH, qos = 0} = Message, Rest) -> 175 {<<TopicLength:16/big>>, _} = split_binary(Rest, 2), 176 {<<_:16, Topic/binary>>, Payload} = split_binary(Rest, 2 + TopicLength), 177 Message#mqtt{ 178 arg = {binary_to_list(Topic), binary_to_list(Payload)} 179 }; 180decode_message(#mqtt{type = ?PUBLISH} = Message, Rest) -> 181 {<<TopicLength:16/big>>, _} = split_binary(Rest, 2), 182 {<<_:16, Topic:TopicLength/binary, MessageId:16/big>>, Payload} = split_binary(Rest, 4 + TopicLength), 183 Message#mqtt{ 184 id = MessageId, 185 arg = {binary_to_list(Topic), binary_to_list(Payload)} 186 }; 187decode_message(#mqtt{type = Type} = Message, Rest) 188 when 189 Type =:= ?PUBACK; 190 Type =:= ?PUBREC; 191 Type =:= ?PUBREL; 192 Type =:= ?PUBCOMP -> 193 <<MessageId:16/big>> = Rest, 194 Message#mqtt{ 195 arg = MessageId 196 }; 197decode_message(#mqtt{type = ?SUBSCRIBE} = Message, Rest) -> 198 {<<MessageId:16/big>>, Payload} = split_binary(Rest, 2), 199 Message#mqtt{ 200 id = MessageId, 201 arg = decode_subs(Payload, []) 202 }; 203decode_message(#mqtt{type = ?SUBACK} = Message, Rest) -> 204 {<<MessageId:16/big>>, Payload} = split_binary(Rest, 2), 205 GrantedQoS = lists:map(fun(Item) -> 206 <<_:6, QoS:2/big>> = <<Item>>, 207 QoS 208 end, 209 binary_to_list(Payload) 210 ), 211 Message#mqtt{ 212 arg = {MessageId, GrantedQoS} 213 }; 214decode_message(#mqtt{type = ?UNSUBSCRIBE} = Message, Rest) -> 215 {<<MessageId:16/big>>, Payload} = split_binary(Rest, 2), 216 Message#mqtt{ 217 id = MessageId, 218 arg = {MessageId, lists:map(fun(T) -> #sub{topic = T} end, decode_strings(Payload))} 219 }; 220decode_message(#mqtt{type = ?UNSUBACK} = Message, Rest) -> 221 <<MessageId:16/big>> = Rest, 222 Message#mqtt{ 223 arg = MessageId 224 }; 225decode_message(#mqtt{type = ?DISCONNECT} = Message, _Rest) -> 226 Message; 227decode_message(Message, Rest) -> 228 exit({decode_message, unexpected_message, {Message, Rest}}). 229 230decode_subs(<<>>, Subs) -> 231 lists:reverse(Subs); 232decode_subs(Bytes, Subs) -> 233 <<TopicLength:16/big, _/binary>> = Bytes, 234 <<_:16, Topic:TopicLength/binary, ?UNUSED:6, QoS:2/big, Rest/binary>> = Bytes, 235 decode_subs(Rest, [#sub{topic = binary_to_list(Topic), qos = QoS}|Subs]). 236 237encode_message(#mqtt{type = ?CONNACK, arg = ReturnCode}) -> 238 {<<?UNUSED:8, ReturnCode:8/big>>,<<>>}; 239encode_message(#mqtt{type = ?CONNECT, arg = Options}) -> 240 CleanStart = case Options#connect_options.clean_start of 241 true -> 242 1; 243 false -> 244 0 245 end, 246 {UserNameFlag, UserNameValue} = case Options#connect_options.username of 247 undefined -> 248 {0, undefined}; 249 UserName -> 250 {1, UserName} 251 end, 252 {PasswordFlag, PasswordValue} = case Options#connect_options.password of 253 undefined -> 254 {0, undefined}; 255 Password -> 256 {1, Password} 257 end, 258 {WillFlag, WillQoS, WillRetain, PayloadList} = 259 case Options#connect_options.will of 260 #will{ topic = undefined } -> 261 {0, 0, 0, [encode_string(Options#connect_options.client_id)]}; 262 #will{ topic = "" } -> 263 {0, 0, 0, [encode_string(Options#connect_options.client_id)]}; 264 undefined -> 265 {0, 0, 0, [encode_string(Options#connect_options.client_id)]}; 266 #will{ topic = WillTopic, message = WillMessage, publish_options = WillOptions } -> 267 {1, 268 WillOptions#publish_options.qos, 269 WillOptions#publish_options.retain, 270 [encode_string(Options#connect_options.client_id), 271 encode_string(WillTopic), 272 encode_string(WillMessage)] 273 } 274 end, 275 Payload1 = case UserNameValue of 276 undefined -> list_to_binary(PayloadList); 277 _ -> 278 case PasswordValue of 279 undefined -> list_to_binary(lists:append(PayloadList, [encode_string(UserNameValue)])); 280 _ -> list_to_binary(lists:append(PayloadList, [encode_string(UserNameValue), encode_string(PasswordValue)])) 281 end 282 end, 283 { 284 list_to_binary([ 285 encode_string(Options#connect_options.protocol_name), 286 <<(Options#connect_options.protocol_version)/big>>, 287 <<UserNameFlag:1, PasswordFlag:1, WillRetain:1, WillQoS:2/big, WillFlag:1, CleanStart:1, ?UNUSED:1, (Options#connect_options.keepalive):16/big>> 288 ]), 289 Payload1 290 }; 291encode_message(#mqtt{type = ?PUBLISH, arg = {Topic, Payload}} = Message) -> 292 if 293 Message#mqtt.qos =:= 0 -> 294 { 295 encode_string(Topic), 296 list_to_binary(Payload) 297 }; 298 Message#mqtt.qos > 0 -> 299 { 300 list_to_binary([encode_string(Topic), <<(Message#mqtt.id):16/big>>]), 301 list_to_binary(Payload) 302 } 303 end; 304encode_message(#mqtt{type = ?PUBACK, arg = MessageId}) -> 305 { 306 <<MessageId:16/big>>, 307 <<>> 308 }; 309encode_message(#mqtt{type = ?SUBSCRIBE, arg = Subs} = Message) -> 310 { 311 <<(Message#mqtt.id):16/big>>, 312 list_to_binary( lists:flatten( lists:map(fun({sub, Topic, RequestedQoS}) -> [encode_string(Topic), <<?UNUSED:6, RequestedQoS:2/big>>] end, Subs))) 313 }; 314encode_message(#mqtt{type = ?SUBACK, arg = {MessageId, Subs}}) -> 315 { 316 <<MessageId:16/big>>, 317 list_to_binary(lists:map(fun(S) -> <<?UNUSED:6, (S#sub.qos):2/big>> end, Subs)) 318 }; 319encode_message(#mqtt{type = ?UNSUBSCRIBE, arg = Subs} = Message) -> 320 { 321 <<(Message#mqtt.id):16/big>>, 322 list_to_binary(lists:map(fun({sub, T, _Q}) -> encode_string(T) end, Subs)) 323 }; 324encode_message(#mqtt{type = ?UNSUBACK, arg = MessageId}) -> 325 {<<MessageId:16/big>>, <<>>}; 326encode_message(#mqtt{type = ?PINGREQ}) -> 327 {<<>>, <<>>}; 328encode_message(#mqtt{type = ?PINGRESP}) -> 329 {<<>>, <<>>}; 330encode_message(#mqtt{type = ?PUBREC, arg = MessageId}) -> 331 {<<MessageId:16/big>>, <<>>}; 332encode_message(#mqtt{type = ?PUBREL, arg = MessageId}) -> 333 {<<MessageId:16/big>>, <<>>}; 334encode_message(#mqtt{type = ?PUBCOMP, arg = MessageId}) -> 335 {<<MessageId:16/big>>, <<>>}; 336encode_message(#mqtt{type = ?DISCONNECT}) -> 337 {<<>>, <<>>}; 338encode_message(#mqtt{} = Message) -> 339 exit({encode_message, unknown_type, Message}). 340 341decode_length(<<>>) -> more; 342decode_length(Data) -> 343 decode_length(Data, 1, 0). 344decode_length(<<>>, Multiplier, Value) -> 345 {0, <<>>}; 346decode_length(<<0:1, Length:7, Rest/binary>>, Multiplier, Value) -> 347 {Value + Multiplier * Length, Rest}; 348decode_length(<<1:1, Length:7, Rest/binary>>, Multiplier, Value) -> 349 decode_length(Rest, Multiplier * 128, Value + Multiplier * Length). 350 351encode_length(Length) -> 352 encode_length(Length, <<>>). 353 354encode_length(Length, Buff) when Length div 128 > 0 -> 355 Digit = Length rem 128, 356 Current = <<1:1, Digit:7/big>>, 357 encode_length(Length div 128, <<Buff/binary, Current/binary>>); 358encode_length(Length, Buff) -> 359 Digit = Length rem 128, 360 Current = <<0:1, Digit:7/big>>, 361 <<Buff/binary, Current/binary>>. 362 363encode_fixed_header(Message) when is_record(Message, mqtt) -> 364 <<(Message#mqtt.type):4/big, (Message#mqtt.dup):1, (Message#mqtt.qos):2/big, (Message#mqtt.retain):1>>. 365 366decode_fixed_header(Byte) -> 367 <<Type:4/big, Dup:1, QoS:2/big, Retain:1>> = Byte, 368 #mqtt{type = Type, dup = Dup, qos = QoS, retain = Retain}. 369 370encode_string(String) -> 371 Bytes = list_to_binary(String), 372 Length = size(Bytes), 373 <<Length:16/big, Bytes/binary>>. 374 375decode_strings(Bytes) when is_binary(Bytes) -> 376 decode_strings(Bytes, []). 377decode_strings(<<>>, Strings) -> 378 lists:reverse(Strings); 379decode_strings(<<Length:16/big, _/binary>> = Bytes, Strings) -> 380 <<_:16, Binary:Length/binary, Rest/binary>> = Bytes, 381 decode_strings(Rest, [binary_to_list(Binary)|Strings]). 382