1# Copyright 2016 Apcera Inc. All rights reserved.
2defmodule Nats.Parser do
3
4  @default_state %{ps: :verb, lexs: [], msg: nil, size: nil, verb: nil}
5  def init do
6    @default_state
7  end
8
9  defp parse_json(state, rest, verb, str) do
10    case :json_lexer.string(str) do
11      {:ok, tokens, _} ->
12        pres = :json_parser.parse(tokens)
13        case pres do
14          {:ok, json } when is_map(json) -> {:ok, {verb, json}, rest, state}
15          {:ok, json } -> parse_err(state, "not a json object in #{verb}", json)
16          {:error, {_, what, mesg}} -> parse_err(state, "invalid json in #{verb}", "#{what}: #{mesg}")
17          other -> parse_err(state, "unexpected json parser result in #{verb}", other)
18        end
19      {:eof, _} -> parse_err(state, "json not complete in #{verb}")
20      {:error, {_, why, mesg}} -> parse_err(state, "invalid json tokens in #{verb}", [why, mesg])
21      # safe programming ;-)
22      other -> parse_err(state, "unexpected json lexer result for json in #{verb}", other)
23    end
24  end
25
26  defp parse_err(state, mesg) do
27    {:error, "NATS: parsing error: #{mesg}", %{state | ps: :error}}
28  end
29  defp parse_err(state, mesg, what) do
30    parse_err(state, "#{mesg}: #{inspect(what)}")
31  end
32
33  @endverb "\r\n"
34
35  def parse(string) do parse(nil, string) end
36  def parse(nil, string) do parse(@default_state, string) end
37
38# @doc """
39#  Parse a the NATS protocol from the given `stream`.
40#
41#  Returns {:ok, message, rest} if a message is parsed from the passed stream,
42#  or {:cont, fn } if the message is incomplete.
43#
44#  ## Examples
45#
46#  iex>  Nats.Protocol.parse("-ERROR foo\r\n+OK\r")
47#  {:ok, {:error, "foo"}, state}
48#  iex>  Nats.Protocol.parse("+OK\r")
49#  {:cont, ... }
50#  """
51
52  def parse(state = %{ps: :verb, lexs: ls}, thing) do
53    res = :nats_lexer.tokens(ls, to_char_list(thing))
54    #IO.puts "lex got: #{inspect(nls)}"
55    case res do
56      {:done, {:ok, tokens, _}, rest} -> parse_verb(state, tokens, to_string(rest))
57      {:done, {:eof, _}} -> parse_err(state, "message not complete")
58      {:more, nls} -> {:cont, 0, %{state | lexs: nls}}
59      other -> parse_err(state, "unexpected lexer return", other)
60    end
61  end
62
63  defp parse_verb(state, tokens, rest) do  # when is_list(thing) do
64    pres = :nats_parser.parse(tokens)
65    case pres do
66      {:ok, {:info, str}} -> parse_json(state, rest, :info, str)
67      {:ok, {:connect, str}} -> parse_json(state, rest, :connect, str)
68      {:ok, verb = {:msg, _, _, _, len}} ->
69        parse_body(%{state | ps: :body, size: len + 2, msg: <<>>, verb: verb},
70                   rest)
71      {:ok, verb = {:pub, _, _, len}} ->
72        parse_body(%{state | ps: :body, size: len + 2, msg: <<>>, verb: verb},
73                   rest)
74      {:ok, verb } -> {:ok, verb, rest, state}
75      {:error, {_, _, mesg}} -> parse_err(state, "invalid message", mesg)
76      other -> parse_err(state, "unexpected parser return", other)
77    end
78  end
79
80  # We've parsed the whole body. There may be remaining bytes left in rest
81  # so make sure we return them..
82  defp parse_body(state = %{ps: :body, size: 0, msg: bd, verb: v}, rest) do
83    # replace the last value in the verb with the body.
84    tsz = tuple_size(v) - 1
85    body_size = elem(v, tsz)
86    part = binary_part(bd, body_size, 2)
87    #  IO.puts "PART -> #{inspect(part)}"
88    if part != "\r\n" do
89      parse_err(state, "missing body trailer for #{inspect(v)}", part)
90    else
91      {:ok, put_elem(v, tsz, binary_part(bd, 0, body_size)),
92       rest, @default_state}
93    end
94  end
95
96  # We've run out of anything to parse and are still looking for a body
97  # let the parser's caller know we want more data...
98  defp parse_body(state = %{ps: :body, size: sz}, <<>>) do
99    {:cont, sz, state}
100  end
101
102  # We have more data to read in our body AND there is data to be read, yeah!!!
103  # See how much we can slurp up
104  defp parse_body(state = %{ps: :body, size: sz, msg: body}, rest) do
105    # rest = :erlang.list_to_binary(rest)
106    rest_size = byte_size(rest)
107    to_read = min(sz, rest_size)
108    <<read :: binary-size(to_read), remainder::binary>> = rest
109    # IO.puts "reading (#{to_read}): #{inspect(read)}: rest=#{inspect(remainder)}"
110    parse_body(%{state | size: sz - to_read, msg: <<body <> read>>}, remainder)
111  end
112
113  def to_json(false) do <<"false">> end
114  def to_json(true) do <<"true">> end
115  def to_json(nil) do <<"null">> end
116  def to_json(n) when is_number(n) do <<"#{n}">> end
117  def to_json(str) when is_binary(str) do <<?\">> <> str <> <<?\">> end
118  def to_json(map) when is_map(map) do
119    <<?\{>> <>
120      Enum.join(Enum.map(map, fn({k,v}) -> member_pair(k,v) end), ", ") <>
121    <<?}>>
122  end
123  def to_json(array) when is_list(array) do
124    <<?\[>> <>
125      Enum.join(Enum.map(array, fn(x) -> to_json(x) end), ", ") <>
126    <<?\]>>
127  end
128  defp member_pair(k,v) when is_binary(k) do
129    to_json(k) <> <<": ">> <> to_json(v)
130  end
131  def encode(mesg) do
132    encode1(mesg) <> @endverb
133  end
134  defp encode1({:ok}) do <<"+OK">> end
135  defp encode1({:ping}) do <<"PING">> end
136  defp encode1({:pong}) do <<"PONG">> end
137  defp encode1({:err, msg}) do <<"-ERR ">> <> msg end
138  defp encode1({:info, json}) do <<"INFO ">> <> to_json(json) end
139  defp encode1({:connect, json}) do <<"CONNECT ">> <> to_json(json) end
140  defp encode1({:msg, sub, sid, nil, what}) do
141    <<"MSG ">> <> sub <>
142      <<32>> <> sid <>
143      <<32>> <> to_string(byte_size(what)) <> @endverb <> what
144  end
145  defp encode1({:msg, sub, sid, ret, what}) do
146    <<"MSG ">> <> sub <>
147      <<32>> <> sid <>
148      <<32>> <> ret <>
149      <<32>> <> to_string(byte_size(what)) <> @endverb <> what
150  end
151  defp encode1({:pub, sub, nil, what}) do
152    <<"PUB ">> <> sub <>
153      <<32>> <> to_string(byte_size(what)) <> @endverb <> what
154  end
155  defp encode1({:pub, sub, reply, what}) do
156    <<"PUB ">> <> sub <>
157      <<32>> <> reply <>
158      <<32>> <> to_string(byte_size(what)) <> @endverb <> what
159  end
160  defp encode1({:sub, subject, nil, sid}) do
161    <<"SUB ">> <> subject <> <<32>> <> sid
162  end
163  defp encode1({:sub, subject, queue, sid}) do
164    <<"SUB ">> <> subject <> <<32>> <> queue <> <<32>> <> sid
165  end
166end
167