1%% The MIT License (MIT)
2%%
3%% Copyright (c) <2013> <hellomatty@gmail.com>
4%%
5%% Permission is hereby granted, free of charge, to any person obtaining a copy
6%% of this software and associated documentation files (the "Software"), to deal
7%% in the Software without restriction, including without limitation the rights
8%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9%% copies of the Software, and to permit persons to whom the Software is
10%% furnished to do so, subject to the following conditions:
11%%
12%% The above copyright notice and this permission notice shall be included in
13%% all copies or substantial portions of the Software.
14%%
15%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21%% THE SOFTWARE.
22
23%% Modified from: https://code.google.com/p/my-mqtt4erl/source/browse/src/mqtt_core.erl.
24-module(mqtt_frame).
25-author('hellomatty@gmail.com').
26
27%%
28%% An erlang client for MQTT (http://www.mqtt.org/)
29%%
30
31-include_lib("mqtt.hrl").
32
33-export([encode/1, decode/1]).
34-export([set_connect_options/1, set_publish_options/1, command_for_type/1]).
35
36%%%===================================================================
37%%% API functions
38%%%===================================================================
39encode(#mqtt{} = Message) ->
40  {VariableHeader, Payload} = encode_message(Message),
41  FixedHeader = encode_fixed_header(Message),
42  EncodedLength = encode_length(size(VariableHeader) + size(Payload)),
43  <<FixedHeader/binary, EncodedLength/binary, VariableHeader/binary, Payload/binary>>.
44
45decode(<<FixedHeader:8/big, Rest/binary>>) ->
46  case decode_length(Rest) of
47    more ->
48      more;
49    {RemainingLength, Rest1} ->
50      Size = size(Rest1),
51      if
52        Size >= RemainingLength ->
53          <<Body:RemainingLength/binary-unit:8, Left/binary>> = Rest1,
54          {decode_message(decode_fixed_header(<<FixedHeader>>), Body), Left};
55        true -> more
56      end
57  end;
58decode(_Data) ->
59  more.
60
61set_connect_options(Options) ->
62    set_connect_options(Options, #connect_options{}).
63
64set_publish_options(Options) ->
65  set_publish_options(Options, #publish_options{}).
66
67command_for_type(Type) ->
68  case Type of
69    ?CONNECT -> connect;
70    ?CONNACK -> connack;
71    ?PUBLISH -> publish;
72    ?PUBACK  -> puback;
73    ?PUBREC -> pubrec;
74    ?PUBREL -> pubrel;
75    ?PUBCOMP -> pubcomp;
76    ?SUBSCRIBE -> subscribe;
77    ?SUBACK -> suback;
78    ?UNSUBSCRIBE -> unsubscribe;
79    ?UNSUBACK -> unsuback;
80    ?PINGREQ -> pingreq;
81    ?PINGRESP -> pingresp;
82    ?DISCONNECT -> disconnect;
83    _ -> unknown
84  end.
85
86%%%===================================================================
87%%% Internal functions
88%%%===================================================================
89set_connect_options([], Options) ->
90    Options;
91set_connect_options([{keepalive, KeepAlive}|T], Options) ->
92    set_connect_options(T, Options#connect_options{keepalive = KeepAlive});
93set_connect_options([{retry, Retry}|T], Options) ->
94    set_connect_options(T, Options#connect_options{retry = Retry});
95set_connect_options([{client_id, ClientId}|T], Options) ->
96    set_connect_options(T, Options#connect_options{client_id = ClientId});
97set_connect_options([{clean_start, Flag}|T], Options) ->
98    set_connect_options(T, Options#connect_options{clean_start = Flag});
99set_connect_options([{connect_timeout, Timeout}|T], Options) ->
100    set_connect_options(T, Options#connect_options{connect_timeout = Timeout});
101set_connect_options([{username, UserName}|T], Options) ->
102    set_connect_options(T, Options#connect_options{username = UserName});
103set_connect_options([{password, Password}|T], Options) ->
104    set_connect_options(T, Options#connect_options{password = Password});
105set_connect_options([#will{} = Will|T], Options) ->
106    set_connect_options(T, Options#connect_options{will = Will});
107set_connect_options([UnknownOption|_T], _Options) ->
108    exit({connect, unknown_option, UnknownOption}).
109
110set_publish_options([], Options) ->
111  Options;
112set_publish_options([{qos, QoS}|T], Options) when QoS >= 0, QoS =< 2 ->
113  set_publish_options(T, Options#publish_options{qos = QoS});
114set_publish_options([{retain, true}|T], Options) ->
115  set_publish_options(T, Options#publish_options{retain = 1});
116set_publish_options([{retain, false}|T], Options) ->
117  set_publish_options(T, Options#publish_options{retain = 0});
118set_publish_options([UnknownOption|_T], _Options) ->
119  exit({unknown, publish_option, UnknownOption}).
120
121construct_will(WT, WM, WillQoS, WillRetain) ->
122    #will{
123        topic = WT,
124        message = WM,
125        publish_options = #publish_options{qos = WillQoS, retain = WillRetain}
126      }.
127
128decode_message(#mqtt{type = ?CONNECT} = Message, Rest) ->
129  <<ProtocolNameLength:16/big, _/binary>> = Rest,
130  {VariableHeader, Payload} = split_binary(Rest, 2 + ProtocolNameLength + 4),
131  <<_:16, ProtocolName:ProtocolNameLength/binary, ProtocolVersion:8/big, UsernameFlag:1, PasswordFlag:1, WillRetain:1, WillQoS:2/big, WillFlag:1, CleanStart:1, _:1, KeepAlive:16/big>> = VariableHeader,
132  {ClientId, Will, Username, Password} = case {WillFlag, UsernameFlag, PasswordFlag} of
133    {1, 0, 0} ->
134      [C, WT, WM] = decode_strings(Payload),
135      W = construct_will(WT, WM, WillQoS, WillRetain),
136      {C, W, undefined, undefined};
137    {1, 1, 0} ->
138      [C, WT, WM, U] = decode_strings(Payload),
139      W = construct_will(WT, WM, WillQoS, WillRetain),
140      {C, W, U, undefined};
141    {1, 1, 1} ->
142      [C, WT, WM, U, P] = decode_strings(Payload),
143      W = construct_will(WT, WM, WillQoS, WillRetain),
144      {C, W, U, P};
145    {0, 1, 0} ->
146      [C, U] = decode_strings(Payload),
147      {C, undefined, U, undefined};
148    {0, 1, 1} ->
149      [C, U, P] = decode_strings(Payload),
150      {C, undefined, U, P};
151    {0, 0, 0} ->
152      [C] = decode_strings(Payload),
153      {C, undefined, undefined, undefined}
154  end,
155  Message#mqtt{
156    arg = #connect_options{
157      client_id = ClientId,
158      protocol_name = binary_to_list(ProtocolName),
159      protocol_version = ProtocolVersion,
160      clean_start = CleanStart =:= 1,
161      will = Will,
162      username = Username,
163      password = Password,
164      keepalive = KeepAlive
165    }
166  };
167decode_message(#mqtt{type = ?CONNACK} = Message, Rest) ->
168  <<_:8, ResponseCode:8/big>> = Rest,
169  Message#mqtt{arg = ResponseCode};
170decode_message(#mqtt{type = ?PINGRESP} = Message, _Rest) ->
171  Message;
172decode_message(#mqtt{type = ?PINGREQ} = Message, _Rest) ->
173  Message;
174decode_message(#mqtt{type = ?PUBLISH, qos = 0} = Message, Rest) ->
175  {<<TopicLength:16/big>>, _} = split_binary(Rest, 2),
176  {<<_:16, Topic/binary>>, Payload} = split_binary(Rest, 2 + TopicLength),
177  Message#mqtt{
178    arg = {binary_to_list(Topic), binary_to_list(Payload)}
179  };
180decode_message(#mqtt{type = ?PUBLISH} = Message, Rest) ->
181  {<<TopicLength:16/big>>, _} = split_binary(Rest, 2),
182  {<<_:16, Topic:TopicLength/binary, MessageId:16/big>>, Payload} = split_binary(Rest, 4 + TopicLength),
183   Message#mqtt{
184    id = MessageId,
185    arg = {binary_to_list(Topic), binary_to_list(Payload)}
186  };
187decode_message(#mqtt{type = Type} = Message, Rest)
188    when
189      Type =:= ?PUBACK;
190      Type =:= ?PUBREC;
191      Type =:= ?PUBREL;
192      Type =:= ?PUBCOMP ->
193  <<MessageId:16/big>> = Rest,
194  Message#mqtt{
195    arg = MessageId
196  };
197decode_message(#mqtt{type = ?SUBSCRIBE} = Message, Rest) ->
198  {<<MessageId:16/big>>, Payload} = split_binary(Rest, 2),
199  Message#mqtt{
200    id = MessageId,
201    arg = decode_subs(Payload, [])
202  };
203decode_message(#mqtt{type = ?SUBACK} = Message, Rest) ->
204  {<<MessageId:16/big>>, Payload} = split_binary(Rest, 2),
205  GrantedQoS  = lists:map(fun(Item) ->
206      <<_:6, QoS:2/big>> = <<Item>>,
207      QoS
208    end,
209    binary_to_list(Payload)
210  ),
211  Message#mqtt{
212    arg = {MessageId, GrantedQoS}
213  };
214decode_message(#mqtt{type = ?UNSUBSCRIBE} = Message, Rest) ->
215  {<<MessageId:16/big>>, Payload} = split_binary(Rest, 2),
216  Message#mqtt{
217    id = MessageId,
218    arg = {MessageId, lists:map(fun(T) -> #sub{topic = T} end, decode_strings(Payload))}
219  };
220decode_message(#mqtt{type = ?UNSUBACK} = Message, Rest) ->
221  <<MessageId:16/big>> = Rest,
222  Message#mqtt{
223    arg = MessageId
224  };
225decode_message(#mqtt{type = ?DISCONNECT} = Message, _Rest) ->
226  Message;
227decode_message(Message, Rest) ->
228  exit({decode_message, unexpected_message, {Message, Rest}}).
229
230decode_subs(<<>>, Subs) ->
231  lists:reverse(Subs);
232decode_subs(Bytes, Subs) ->
233  <<TopicLength:16/big, _/binary>> = Bytes,
234  <<_:16, Topic:TopicLength/binary, ?UNUSED:6, QoS:2/big, Rest/binary>> = Bytes,
235  decode_subs(Rest, [#sub{topic = binary_to_list(Topic), qos = QoS}|Subs]).
236
237encode_message(#mqtt{type = ?CONNACK, arg = ReturnCode}) ->
238  {<<?UNUSED:8, ReturnCode:8/big>>,<<>>};
239encode_message(#mqtt{type = ?CONNECT, arg = Options}) ->
240  CleanStart = case Options#connect_options.clean_start of
241    true ->
242      1;
243    false ->
244      0
245  end,
246  {UserNameFlag, UserNameValue} = case Options#connect_options.username of
247    undefined ->
248      {0, undefined};
249    UserName ->
250      {1, UserName}
251  end,
252  {PasswordFlag, PasswordValue} = case Options#connect_options.password of
253    undefined ->
254      {0, undefined};
255    Password ->
256      {1, Password}
257  end,
258  {WillFlag, WillQoS, WillRetain, PayloadList} =
259        case Options#connect_options.will of
260            #will{ topic = undefined } ->
261                {0, 0, 0, [encode_string(Options#connect_options.client_id)]};
262            #will{ topic = "" } ->
263                {0, 0, 0, [encode_string(Options#connect_options.client_id)]};
264            undefined ->
265                {0, 0, 0, [encode_string(Options#connect_options.client_id)]};
266            #will{ topic = WillTopic, message = WillMessage, publish_options = WillOptions } ->
267                {1,
268                 WillOptions#publish_options.qos,
269                 WillOptions#publish_options.retain,
270                 [encode_string(Options#connect_options.client_id),
271                  encode_string(WillTopic),
272                  encode_string(WillMessage)]
273                }
274        end,
275  Payload1 = case UserNameValue of
276    undefined -> list_to_binary(PayloadList);
277    _ ->
278      case PasswordValue of
279        undefined -> list_to_binary(lists:append(PayloadList, [encode_string(UserNameValue)]));
280        _ -> list_to_binary(lists:append(PayloadList, [encode_string(UserNameValue), encode_string(PasswordValue)]))
281      end
282    end,
283  {
284    list_to_binary([
285      encode_string(Options#connect_options.protocol_name),
286      <<(Options#connect_options.protocol_version)/big>>,
287      <<UserNameFlag:1, PasswordFlag:1, WillRetain:1, WillQoS:2/big, WillFlag:1, CleanStart:1, ?UNUSED:1, (Options#connect_options.keepalive):16/big>>
288    ]),
289    Payload1
290  };
291encode_message(#mqtt{type = ?PUBLISH, arg = {Topic, Payload}} = Message) ->
292  if
293    Message#mqtt.qos =:= 0 ->
294        {
295          encode_string(Topic),
296          list_to_binary(Payload)
297        };
298    Message#mqtt.qos > 0 ->
299        {
300          list_to_binary([encode_string(Topic), <<(Message#mqtt.id):16/big>>]),
301          list_to_binary(Payload)
302        }
303  end;
304encode_message(#mqtt{type = ?PUBACK, arg = MessageId}) ->
305  {
306    <<MessageId:16/big>>,
307    <<>>
308  };
309encode_message(#mqtt{type = ?SUBSCRIBE, arg = Subs} = Message) ->
310  {
311    <<(Message#mqtt.id):16/big>>,
312    list_to_binary( lists:flatten( lists:map(fun({sub, Topic, RequestedQoS}) -> [encode_string(Topic), <<?UNUSED:6, RequestedQoS:2/big>>] end, Subs)))
313  };
314encode_message(#mqtt{type = ?SUBACK, arg = {MessageId, Subs}}) ->
315  {
316    <<MessageId:16/big>>,
317    list_to_binary(lists:map(fun(S) -> <<?UNUSED:6, (S#sub.qos):2/big>> end, Subs))
318  };
319encode_message(#mqtt{type = ?UNSUBSCRIBE, arg = Subs} = Message) ->
320  {
321    <<(Message#mqtt.id):16/big>>,
322    list_to_binary(lists:map(fun({sub, T, _Q}) -> encode_string(T) end, Subs))
323  };
324encode_message(#mqtt{type = ?UNSUBACK, arg = MessageId}) ->
325  {<<MessageId:16/big>>, <<>>};
326encode_message(#mqtt{type = ?PINGREQ}) ->
327  {<<>>, <<>>};
328encode_message(#mqtt{type = ?PINGRESP}) ->
329  {<<>>, <<>>};
330encode_message(#mqtt{type = ?PUBREC, arg = MessageId}) ->
331  {<<MessageId:16/big>>, <<>>};
332encode_message(#mqtt{type = ?PUBREL, arg = MessageId}) ->
333  {<<MessageId:16/big>>, <<>>};
334encode_message(#mqtt{type = ?PUBCOMP, arg = MessageId}) ->
335  {<<MessageId:16/big>>, <<>>};
336encode_message(#mqtt{type = ?DISCONNECT}) ->
337  {<<>>, <<>>};
338encode_message(#mqtt{} = Message) ->
339  exit({encode_message, unknown_type, Message}).
340
341decode_length(<<>>) -> more;
342decode_length(Data) ->
343  decode_length(Data, 1, 0).
344decode_length(<<>>, Multiplier, Value) ->
345  {0, <<>>};
346decode_length(<<0:1, Length:7, Rest/binary>>, Multiplier, Value) ->
347  {Value + Multiplier * Length, Rest};
348decode_length(<<1:1, Length:7, Rest/binary>>, Multiplier, Value) ->
349  decode_length(Rest, Multiplier * 128, Value + Multiplier * Length).
350
351encode_length(Length) ->
352  encode_length(Length, <<>>).
353
354encode_length(Length, Buff) when Length div 128 > 0 ->
355  Digit = Length rem 128,
356  Current = <<1:1, Digit:7/big>>,
357  encode_length(Length div 128, <<Buff/binary, Current/binary>>);
358encode_length(Length, Buff) ->
359  Digit = Length rem 128,
360  Current = <<0:1, Digit:7/big>>,
361  <<Buff/binary, Current/binary>>.
362
363encode_fixed_header(Message) when is_record(Message, mqtt) ->
364  <<(Message#mqtt.type):4/big, (Message#mqtt.dup):1, (Message#mqtt.qos):2/big, (Message#mqtt.retain):1>>.
365
366decode_fixed_header(Byte) ->
367  <<Type:4/big, Dup:1, QoS:2/big, Retain:1>> = Byte,
368  #mqtt{type = Type, dup = Dup, qos = QoS, retain = Retain}.
369
370encode_string(String) ->
371  Bytes = list_to_binary(String),
372  Length = size(Bytes),
373  <<Length:16/big, Bytes/binary>>.
374
375decode_strings(Bytes) when is_binary(Bytes) ->
376  decode_strings(Bytes, []).
377decode_strings(<<>>, Strings) ->
378  lists:reverse(Strings);
379decode_strings(<<Length:16/big, _/binary>> = Bytes, Strings) ->
380  <<_:16, Binary:Length/binary, Rest/binary>> = Bytes,
381  decode_strings(Rest, [binary_to_list(Binary)|Strings]).
382