1# Copyright 2016 Apcera Inc. All rights reserved. 2defmodule Nats.Parser do 3 4 @default_state %{ps: :verb, lexs: [], msg: nil, size: nil, verb: nil} 5 def init do 6 @default_state 7 end 8 9 defp parse_json(state, rest, verb, str) do 10 case :json_lexer.string(str) do 11 {:ok, tokens, _} -> 12 pres = :json_parser.parse(tokens) 13 case pres do 14 {:ok, json } when is_map(json) -> {:ok, {verb, json}, rest, state} 15 {:ok, json } -> parse_err(state, "not a json object in #{verb}", json) 16 {:error, {_, what, mesg}} -> parse_err(state, "invalid json in #{verb}", "#{what}: #{mesg}") 17 other -> parse_err(state, "unexpected json parser result in #{verb}", other) 18 end 19 {:eof, _} -> parse_err(state, "json not complete in #{verb}") 20 {:error, {_, why, mesg}} -> parse_err(state, "invalid json tokens in #{verb}", [why, mesg]) 21 # safe programming ;-) 22 other -> parse_err(state, "unexpected json lexer result for json in #{verb}", other) 23 end 24 end 25 26 defp parse_err(state, mesg) do 27 {:error, "NATS: parsing error: #{mesg}", %{state | ps: :error}} 28 end 29 defp parse_err(state, mesg, what) do 30 parse_err(state, "#{mesg}: #{inspect(what)}") 31 end 32 33 @endverb "\r\n" 34 35 def parse(string) do parse(nil, string) end 36 def parse(nil, string) do parse(@default_state, string) end 37 38# @doc """ 39# Parse a the NATS protocol from the given `stream`. 40# 41# Returns {:ok, message, rest} if a message is parsed from the passed stream, 42# or {:cont, fn } if the message is incomplete. 43# 44# ## Examples 45# 46# iex> Nats.Protocol.parse("-ERROR foo\r\n+OK\r") 47# {:ok, {:error, "foo"}, state} 48# iex> Nats.Protocol.parse("+OK\r") 49# {:cont, ... } 50# """ 51 52 def parse(state = %{ps: :verb, lexs: ls}, thing) do 53 res = :nats_lexer.tokens(ls, to_char_list(thing)) 54 #IO.puts "lex got: #{inspect(nls)}" 55 case res do 56 {:done, {:ok, tokens, _}, rest} -> parse_verb(state, tokens, to_string(rest)) 57 {:done, {:eof, _}} -> parse_err(state, "message not complete") 58 {:more, nls} -> {:cont, 0, %{state | lexs: nls}} 59 other -> parse_err(state, "unexpected lexer return", other) 60 end 61 end 62 63 defp parse_verb(state, tokens, rest) do # when is_list(thing) do 64 pres = :nats_parser.parse(tokens) 65 case pres do 66 {:ok, {:info, str}} -> parse_json(state, rest, :info, str) 67 {:ok, {:connect, str}} -> parse_json(state, rest, :connect, str) 68 {:ok, verb = {:msg, _, _, _, len}} -> 69 parse_body(%{state | ps: :body, size: len + 2, msg: <<>>, verb: verb}, 70 rest) 71 {:ok, verb = {:pub, _, _, len}} -> 72 parse_body(%{state | ps: :body, size: len + 2, msg: <<>>, verb: verb}, 73 rest) 74 {:ok, verb } -> {:ok, verb, rest, state} 75 {:error, {_, _, mesg}} -> parse_err(state, "invalid message", mesg) 76 other -> parse_err(state, "unexpected parser return", other) 77 end 78 end 79 80 # We've parsed the whole body. There may be remaining bytes left in rest 81 # so make sure we return them.. 82 defp parse_body(state = %{ps: :body, size: 0, msg: bd, verb: v}, rest) do 83 # replace the last value in the verb with the body. 84 tsz = tuple_size(v) - 1 85 body_size = elem(v, tsz) 86 part = binary_part(bd, body_size, 2) 87 # IO.puts "PART -> #{inspect(part)}" 88 if part != "\r\n" do 89 parse_err(state, "missing body trailer for #{inspect(v)}", part) 90 else 91 {:ok, put_elem(v, tsz, binary_part(bd, 0, body_size)), 92 rest, @default_state} 93 end 94 end 95 96 # We've run out of anything to parse and are still looking for a body 97 # let the parser's caller know we want more data... 98 defp parse_body(state = %{ps: :body, size: sz}, <<>>) do 99 {:cont, sz, state} 100 end 101 102 # We have more data to read in our body AND there is data to be read, yeah!!! 103 # See how much we can slurp up 104 defp parse_body(state = %{ps: :body, size: sz, msg: body}, rest) do 105 # rest = :erlang.list_to_binary(rest) 106 rest_size = byte_size(rest) 107 to_read = min(sz, rest_size) 108 <<read :: binary-size(to_read), remainder::binary>> = rest 109 # IO.puts "reading (#{to_read}): #{inspect(read)}: rest=#{inspect(remainder)}" 110 parse_body(%{state | size: sz - to_read, msg: <<body <> read>>}, remainder) 111 end 112 113 def to_json(false) do <<"false">> end 114 def to_json(true) do <<"true">> end 115 def to_json(nil) do <<"null">> end 116 def to_json(n) when is_number(n) do <<"#{n}">> end 117 def to_json(str) when is_binary(str) do <<?\">> <> str <> <<?\">> end 118 def to_json(map) when is_map(map) do 119 <<?\{>> <> 120 Enum.join(Enum.map(map, fn({k,v}) -> member_pair(k,v) end), ", ") <> 121 <<?}>> 122 end 123 def to_json(array) when is_list(array) do 124 <<?\[>> <> 125 Enum.join(Enum.map(array, fn(x) -> to_json(x) end), ", ") <> 126 <<?\]>> 127 end 128 defp member_pair(k,v) when is_binary(k) do 129 to_json(k) <> <<": ">> <> to_json(v) 130 end 131 def encode(mesg) do 132 encode1(mesg) <> @endverb 133 end 134 defp encode1({:ok}) do <<"+OK">> end 135 defp encode1({:ping}) do <<"PING">> end 136 defp encode1({:pong}) do <<"PONG">> end 137 defp encode1({:err, msg}) do <<"-ERR ">> <> msg end 138 defp encode1({:info, json}) do <<"INFO ">> <> to_json(json) end 139 defp encode1({:connect, json}) do <<"CONNECT ">> <> to_json(json) end 140 defp encode1({:msg, sub, sid, nil, what}) do 141 <<"MSG ">> <> sub <> 142 <<32>> <> sid <> 143 <<32>> <> to_string(byte_size(what)) <> @endverb <> what 144 end 145 defp encode1({:msg, sub, sid, ret, what}) do 146 <<"MSG ">> <> sub <> 147 <<32>> <> sid <> 148 <<32>> <> ret <> 149 <<32>> <> to_string(byte_size(what)) <> @endverb <> what 150 end 151 defp encode1({:pub, sub, nil, what}) do 152 <<"PUB ">> <> sub <> 153 <<32>> <> to_string(byte_size(what)) <> @endverb <> what 154 end 155 defp encode1({:pub, sub, reply, what}) do 156 <<"PUB ">> <> sub <> 157 <<32>> <> reply <> 158 <<32>> <> to_string(byte_size(what)) <> @endverb <> what 159 end 160 defp encode1({:sub, subject, nil, sid}) do 161 <<"SUB ">> <> subject <> <<32>> <> sid 162 end 163 defp encode1({:sub, subject, queue, sid}) do 164 <<"SUB ">> <> subject <> <<32>> <> queue <> <<32>> <> sid 165 end 166end 167