1%% Copyright (c) 2016-2018, Loïc Hoguin <essen@ninenines.eu>
2%%
3%% Permission to use, copy, modify, and/or distribute this software for any
4%% purpose with or without fee is hereby granted, provided that the above
5%% copyright notice and this permission notice appear in all copies.
6%%
7%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
8%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
9%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
10%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
11%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
12%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
13%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
14
15-module(gun_http2).
16
17-export([check_options/1]).
18-export([name/0]).
19-export([init/4]).
20-export([handle/2]).
21-export([close/1]).
22-export([keepalive/1]).
23-export([request/8]).
24-export([request/9]).
25-export([data/5]).
26-export([cancel/3]).
27-export([down/1]).
28
29-record(stream, {
30	id :: non_neg_integer(),
31	ref :: reference(),
32	reply_to :: pid(),
33	%% Whether we finished sending data.
34	local = nofin :: fin | nofin,
35	%% Local flow control window (how much we can send).
36	local_window :: integer(),
37	%% Buffered data waiting for the flow control window to increase.
38	local_buffer = queue:new() :: queue:queue(
39		{fin | nofin, non_neg_integer(), iolist()}),
40	local_buffer_size = 0 :: non_neg_integer(),
41	local_trailers = undefined :: undefined | cow_http:headers(),
42	%% Whether we finished receiving data.
43	remote = nofin :: fin | nofin,
44	%% Remote flow control window (how much we accept to receive).
45	remote_window :: integer(),
46	%% Content handlers state.
47	handler_state :: undefined | gun_content_handler:state()
48}).
49
50-record(http2_state, {
51	owner :: pid(),
52	socket :: inet:socket() | ssl:sslsocket(),
53	transport :: module(),
54	opts = #{} :: map(), %% @todo
55	content_handlers :: gun_content_handler:opt(),
56	buffer = <<>> :: binary(),
57
58	local_settings = #{
59		initial_window_size => 65535,
60		max_frame_size => 16384
61	} :: map(),
62	remote_settings = #{
63		initial_window_size => 65535
64	} :: map(),
65
66	%% Connection-wide flow control window.
67	local_window = 65535 :: integer(), %% How much we can send.
68	remote_window = 65535 :: integer(), %% How much we accept to receive.
69
70	streams = [] :: [#stream{}],
71	stream_id = 1 :: non_neg_integer(),
72
73	%% The client starts by sending a sequence of bytes as a preface,
74	%% followed by a potentially empty SETTINGS frame. Then the connection
75	%% is established and continues normally. An exception is when a HEADERS
76	%% frame is sent followed by CONTINUATION frames: no other frame can be
77	%% sent in between.
78	parse_state = undefined :: preface | normal
79		| {continuation, cowboy_stream:streamid(), cowboy_stream:fin(), binary()},
80
81	%% HPACK decoding and encoding state.
82	decode_state = cow_hpack:init() :: cow_hpack:state(),
83	encode_state = cow_hpack:init() :: cow_hpack:state()
84}).
85
86check_options(Opts) ->
87	do_check_options(maps:to_list(Opts)).
88
89do_check_options([]) ->
90	ok;
91do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
92	case gun_content_handler:check_option(Handlers) of
93		ok -> do_check_options(Opts);
94		error -> {error, {options, {http, Opt}}}
95	end;
96do_check_options([{keepalive, infinity}|Opts]) ->
97	do_check_options(Opts);
98do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
99	do_check_options(Opts);
100%% @todo max_frame_size_sent
101do_check_options([Opt|_]) ->
102	{error, {options, {http2, Opt}}}.
103
104name() -> http2.
105
106init(Owner, Socket, Transport, Opts) ->
107	Handlers = maps:get(content_handlers, Opts, [gun_data_h]),
108	State = #http2_state{owner=Owner, socket=Socket,
109		transport=Transport, opts=Opts, content_handlers=Handlers,
110		parse_state=preface},
111	#http2_state{local_settings=Settings} = State,
112	%% Send the HTTP/2 preface.
113	Transport:send(Socket, [
114		<< "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>,
115		cow_http2:settings(Settings)
116	]),
117	State.
118
119handle(Data, State=#http2_state{buffer=Buffer}) ->
120	parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}).
121
122parse(Data0, State0=#http2_state{buffer=Buffer, parse_state=preface}) ->
123	Data = << Buffer/binary, Data0/binary >>,
124	case cow_http2:parse(Data) of
125		{ok, Frame, Rest} when element(1, Frame) =:= settings ->
126			case frame(Frame, State0#http2_state{parse_state=normal}) of
127				close -> close;
128				Error = {error, _} -> Error;
129				State -> parse(Rest, State)
130			end;
131		more ->
132			case Data of
133				%% Maybe we have a proper SETTINGS frame.
134				<<_:24,4:8,_/bits>> ->
135					{state, State0#http2_state{buffer=Data}};
136				%% Not a SETTINGS frame, this is an invalid preface.
137				<<"HTTP/1",_/bits>> ->
138					terminate(State0, {connection_error, protocol_error,
139						'Invalid connection preface received. Appears to be an HTTP/1 response? (RFC7540 3.5)'});
140				_ ->
141					terminate(State0, {connection_error, protocol_error,
142						'Invalid connection preface received. (RFC7540 3.5)'})
143			end;
144		%% Any error in the preface is converted to this specific error
145		%% to make debugging the problem easier (it's the server's fault).
146		_ ->
147			Reason = case Data of
148				<<"HTTP/1",_/bits>> ->
149					'Invalid connection preface received. Appears to be an HTTP/1 response? (RFC7540 3.5)';
150				_ ->
151					'Invalid connection preface received. (RFC7540 3.5)'
152			end,
153			terminate(State0, {connection_error, protocol_error, Reason})
154	end;
155parse(Data0, State0=#http2_state{buffer=Buffer, parse_state=PS}) ->
156	Data = << Buffer/binary, Data0/binary >>,
157	case cow_http2:parse(Data) of
158		{ok, Frame, Rest} when PS =:= normal ->
159			case frame(Frame, State0) of
160				close -> close;
161				Error = {error, _} -> Error;
162				State1 -> parse(Rest, State1)
163			end;
164		{ok, Frame, Rest} when element(1, PS) =:= continuation ->
165			case continuation_frame(Frame, State0) of
166				close -> close;
167				Error = {error, _} -> Error;
168				State1 -> parse(Rest, State1)
169			end;
170		{ignore, _} when element(1, PS) =:= continuation ->
171			terminate(State0, {connection_error, protocol_error,
172				'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'});
173		{ignore, Rest} ->
174			parse(Rest, State0);
175		{stream_error, StreamID, Reason, Human, Rest} ->
176			parse(Rest, stream_reset(State0, StreamID, {stream_error, Reason, Human}));
177		Error = {connection_error, _, _} ->
178			terminate(State0, Error);
179		more ->
180			{state, State0#http2_state{buffer=Data}}
181	end.
182
183%% DATA frame.
184frame({data, StreamID, IsFin, Data}, State0=#http2_state{remote_window=ConnWindow}) ->
185	case get_stream_by_id(StreamID, State0) of
186		Stream0 = #stream{remote=nofin, remote_window=StreamWindow, handler_state=Handlers0} ->
187			Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
188			{Stream, State} = send_window_update(
189				Stream0#stream{remote_window=StreamWindow - byte_size(Data),
190					handler_state=Handlers},
191				State0#http2_state{remote_window=ConnWindow - byte_size(Data)}),
192			remote_fin(Stream, State, IsFin);
193		_ ->
194			%% @todo protocol_error if not existing
195			stream_reset(State0, StreamID, {stream_error, stream_closed,
196				'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
197	end;
198%% Single HEADERS frame headers block.
199frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, State) ->
200	stream_decode_init(State, StreamID, IsFin, HeaderBlock);
201%% HEADERS frame starting a headers block. Enter continuation mode.
202frame({headers, StreamID, IsFin, head_nofin, HeaderBlockFragment}, State) ->
203	State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}};
204%% Single HEADERS frame headers block with priority.
205frame({headers, StreamID, IsFin, head_fin,
206		_IsExclusive, _DepStreamID, _Weight, HeaderBlock}, State) ->
207	stream_decode_init(State, StreamID, IsFin, HeaderBlock);
208%% @todo HEADERS frame starting a headers block. Enter continuation mode.
209%frame(State, {headers, StreamID, IsFin, head_nofin,
210%		_IsExclusive, _DepStreamID, _Weight, HeaderBlockFragment}) ->
211%	%% @todo Handle priority.
212%	State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}};
213%% @todo PRIORITY frame.
214%frame(State, {priority, _StreamID, _IsExclusive, _DepStreamID, _Weight}) ->
215%	%% @todo Validate StreamID?
216%	%% @todo Handle priority.
217%	State;
218%% @todo RST_STREAM frame.
219frame({rst_stream, StreamID, Reason}, State) ->
220	stream_reset(State, StreamID, {stream_error, Reason, 'Stream reset by server.'});
221%% SETTINGS frame.
222frame({settings, Settings}, State=#http2_state{socket=Socket, transport=Transport,
223		remote_settings=Settings0}) ->
224	Transport:send(Socket, cow_http2:settings_ack()),
225	State#http2_state{remote_settings=maps:merge(Settings0, Settings)};
226%% Ack for a previously sent SETTINGS frame.
227frame(settings_ack, State) -> %% @todo =#http2_state{next_settings=_NextSettings}) ->
228	%% @todo Apply SETTINGS that require synchronization.
229	State;
230%% PUSH_PROMISE frame.
231%% @todo Continuation.
232frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock},
233		State=#http2_state{streams=Streams, decode_state=DecodeState0}) ->
234	case get_stream_by_id(PromisedStreamID, State) of
235		false ->
236			case get_stream_by_id(StreamID, State) of
237				#stream{ref=StreamRef, reply_to=ReplyTo} ->
238					try cow_hpack:decode(HeaderBlock, DecodeState0) of
239						{Headers0, DecodeState} ->
240							{Method, Scheme, Authority, Path, Headers} = try
241								{value, {_, Method0}, Headers1} = lists:keytake(<<":method">>, 1, Headers0),
242								{value, {_, Scheme0}, Headers2} = lists:keytake(<<":scheme">>, 1, Headers1),
243								{value, {_, Authority0}, Headers3} = lists:keytake(<<":authority">>, 1, Headers2),
244								{value, {_, Path0}, Headers4} = lists:keytake(<<":path">>, 1, Headers3),
245								{Method0, Scheme0, Authority0, Path0, Headers4}
246							catch error:badmatch ->
247								stream_reset(State, StreamID, {stream_error, protocol_error,
248									'Malformed push promise; missing pseudo-header field. (RFC7540 8.1.2.3)'})
249							end,
250							NewStreamRef = make_ref(),
251							ReplyTo ! {gun_push, self(), StreamRef, NewStreamRef, Method,
252								iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers},
253							NewStream = new_stream(PromisedStreamID, NewStreamRef, ReplyTo,
254								nofin, fin, State),
255							State#http2_state{streams=[NewStream|Streams], decode_state=DecodeState}
256					catch _:_ ->
257						terminate(State, {connection_error, compression_error,
258							'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
259					end;
260				_ ->
261					stream_reset(State, StreamID, {stream_error, stream_closed,
262						'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
263			end;
264		_ ->
265			stream_reset(State, StreamID, {stream_error, todo, ''})
266	end;
267%% PING frame.
268frame({ping, Opaque}, State=#http2_state{socket=Socket, transport=Transport}) ->
269	Transport:send(Socket, cow_http2:ping_ack(Opaque)),
270	State;
271%% Ack for a previously sent PING frame.
272%%
273%% @todo Might want to check contents but probably a waste of time.
274frame({ping_ack, _Opaque}, State) ->
275	State;
276%% GOAWAY frame.
277frame(Frame={goaway, StreamID, _, _}, State) ->
278	terminate(State, StreamID, {stop, Frame, 'Client is going away.'});
279%% Connection-wide WINDOW_UPDATE frame.
280frame({window_update, Increment}, State=#http2_state{local_window=ConnWindow})
281		when ConnWindow + Increment > 16#7fffffff ->
282	terminate(State, {connection_error, flow_control_error,
283		'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'});
284frame({window_update, Increment}, State=#http2_state{local_window=ConnWindow}) ->
285	send_data(State#http2_state{local_window=ConnWindow + Increment});
286%% Stream-specific WINDOW_UPDATE frame.
287frame({window_update, StreamID, Increment}, State0=#http2_state{streams=Streams0}) ->
288	case lists:keyfind(StreamID, #stream.id, Streams0) of
289		#stream{local_window=StreamWindow} when StreamWindow + Increment > 16#7fffffff ->
290			stream_reset(State0, StreamID, {stream_error, flow_control_error,
291				'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'});
292		Stream0 = #stream{local_window=StreamWindow} ->
293			{State, Stream} = send_data(State0,
294				Stream0#stream{local_window=StreamWindow + Increment}),
295			Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream),
296			State#http2_state{streams=Streams};
297		false ->
298			%% @todo Receiving this frame on a stream in the idle state is an error.
299			%% WINDOW_UPDATE frames may be received for a short period of time
300			%% after a stream is closed. They must be ignored.
301			State0
302	end;
303%% Unexpected CONTINUATION frame.
304frame({continuation, StreamID, _, _}, State) ->
305	terminate(State, StreamID, {connection_error, protocol_error,
306		'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}).
307
308continuation_frame({continuation, StreamID, head_fin, HeaderBlockFragment1},
309		State=#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment0}}) ->
310	HeaderBlock = << HeaderBlockFragment0/binary, HeaderBlockFragment1/binary >>,
311	stream_decode_init(State#http2_state{parse_state=normal}, StreamID, IsFin, HeaderBlock);
312continuation_frame({continuation, StreamID, head_nofin, HeaderBlockFragment1},
313		State=#http2_state{parse_state=
314			{continuation, StreamID, IsFin, HeaderBlockFragment0}}) ->
315	State#http2_state{parse_state={continuation, StreamID, IsFin,
316		<< HeaderBlockFragment0/binary, HeaderBlockFragment1/binary >>}};
317continuation_frame(_, State) ->
318	terminate(State, {connection_error, protocol_error,
319		'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'}).
320
321send_window_update(Stream=#stream{id=StreamID, remote_window=StreamWindow0},
322		State=#http2_state{socket=Socket, transport=Transport, remote_window=ConnWindow0}) ->
323	%% @todo We should make the windows configurable.
324	MinConnWindow = 8000000,
325	MinStreamWindow = 1000000,
326	ConnWindow = if
327		ConnWindow0 =< MinConnWindow ->
328			Transport:send(Socket, cow_http2:window_update(MinConnWindow)),
329			ConnWindow0 + MinConnWindow;
330		true ->
331			ConnWindow0
332	end,
333	StreamWindow = if
334		StreamWindow0 =< MinStreamWindow ->
335			Transport:send(Socket, cow_http2:window_update(StreamID, MinStreamWindow)),
336			StreamWindow0 + MinStreamWindow;
337		true ->
338			StreamWindow0
339	end,
340	{Stream#stream{remote_window=StreamWindow},
341		State#http2_state{remote_window=ConnWindow}}.
342
343close(#http2_state{streams=Streams}) ->
344	close_streams(Streams).
345
346close_streams([]) ->
347	ok;
348close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
349	ReplyTo ! {gun_error, self(), StreamRef, {closed,
350		"The connection was lost."}},
351	close_streams(Tail).
352
353keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
354	Transport:send(Socket, cow_http2:ping(0)),
355	State.
356
357request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
358		streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo,
359		Method, Host, Port, Path, Headers) ->
360	{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
361	IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers))
362			orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of
363		true -> nofin;
364		false -> fin
365	end,
366	Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
367	Stream = new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin, State),
368	State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}.
369
370%% @todo Handle Body > 16MB. (split it out into many frames)
371request(State0=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
372		streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo,
373		Method, Host, Port, Path, Headers0, Body) ->
374	Headers = lists:keystore(<<"content-length">>, 1, Headers0,
375		{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
376	{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
377	Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
378	Stream0 = new_stream(StreamID, StreamRef, ReplyTo, nofin, nofin, State0),
379	{State, Stream} = send_data(State0, Stream0, fin, Body),
380	State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}.
381
382prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) ->
383	Host2 = case Host0 of
384		{local, _SocketPath} -> <<>>;
385		Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
386		_ -> Host0
387	end,
388	Authority = case lists:keyfind(<<"host">>, 1, Headers0) of
389		{_, Host} -> Host;
390		_ -> [Host2, $:, integer_to_binary(Port)]
391	end,
392	%% @todo We also must remove any header found in the connection header.
393	Headers1 =
394		lists:keydelete(<<"host">>, 1,
395		lists:keydelete(<<"connection">>, 1,
396		lists:keydelete(<<"keep-alive">>, 1,
397		lists:keydelete(<<"proxy-connection">>, 1,
398		lists:keydelete(<<"transfer-encoding">>, 1,
399		lists:keydelete(<<"upgrade">>, 1, Headers0)))))),
400	Headers = [
401		{<<":method">>, Method},
402		{<<":scheme">>, case Transport of
403			gun_tls -> <<"https">>;
404			gun_tcp -> <<"http">>
405		end},
406		{<<":authority">>, Authority},
407		{<<":path">>, Path}
408	|Headers1],
409	cow_hpack:encode(Headers, EncodeState).
410
411data(State0, StreamRef, ReplyTo, IsFin, Data) ->
412	case get_stream_by_ref(StreamRef, State0) of
413		#stream{local=fin} ->
414			error_stream_closed(State0, StreamRef, ReplyTo);
415		Stream0 = #stream{} ->
416			{State, Stream} = send_data(State0, Stream0, IsFin, Data),
417			maybe_delete_stream(State, Stream);
418		false ->
419			error_stream_not_found(State0, StreamRef, ReplyTo)
420	end.
421
422%% @todo Should we ever want to implement the PRIORITY mechanism,
423%% this would be the place to do it. Right now, we just go over
424%% all streams and send what we can until either everything is
425%% sent or we run out of space in the window.
426send_data(State=#http2_state{streams=Streams}) ->
427	resume_streams(State, Streams, []).
428
429%% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update
430%% the local stream windows for all active streams and perhaps
431%% resume sending data.
432%update_streams_local_window(State=#http2_state{streams=Streams0}, Increment) ->
433%	Streams = [
434%		S#stream{local_window=StreamWindow + Increment}
435%	|| S=#stream{local_window=StreamWindow} <- Streams0],
436%	resume_streams(State, Streams, []).
437
438%% When we receive an ack to a SETTINGS frame we sent we need to update
439%% the remote stream windows for all active streams.
440%update_streams_remote_window(State=#http2_state{streams=Streams0}, Increment) ->
441%	Streams = [
442%		S#stream{remote_window=StreamWindow + Increment}
443%	|| S=#stream{remote_window=StreamWindow} <- Streams0],
444%	State#http2_state{streams=Streams}.
445
446resume_streams(State, [], Acc) ->
447	State#http2_state{streams=lists:reverse(Acc)};
448%% While technically we should never get < 0 here, let's be on the safe side.
449resume_streams(State=#http2_state{local_window=ConnWindow}, Streams, Acc)
450		when ConnWindow =< 0 ->
451	State#http2_state{streams=lists:reverse(Acc, Streams)};
452%% We rely on send_data/2 to do all the necessary checks about the stream.
453resume_streams(State0, [Stream0|Tail], Acc) ->
454	{State1, Stream} = send_data(State0, Stream0),
455	resume_streams(State1, Tail, [Stream|Acc]).
456
457send_data(State, Stream=#stream{local=Local, local_buffer_size=0, local_trailers=Trailers})
458		when (Trailers =/= undefined) andalso ((Local =:= idle) orelse (Local =:= nofin)) ->
459	send_trailers(State, Stream#stream{local_trailers=undefined}, Trailers);
460%% @todo It's possible that the stream terminates. We must remove it.
461send_data(State=#http2_state{local_window=ConnWindow},
462		Stream=#stream{local=IsFin, local_window=StreamWindow, local_buffer_size=BufferSize})
463		when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 ->
464	{State, Stream};
465send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}) ->
466	%% We know there is an item in the queue.
467	{{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0),
468	{State, Stream} = send_data(State0,
469		Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize},
470		IsFin, Data, in_r),
471	send_data(State, Stream).
472
473send_data(State, Stream, IsFin, Data) ->
474	send_data(State, Stream, IsFin, Data, in).
475
476%% We can send trailers immediately if the queue is empty, otherwise we queue.
477%% We always send trailer frames even if the window is empty.
478send_data(State, Stream=#stream{local_buffer_size=0}, fin, {trailers, Trailers}, _) ->
479	send_trailers(State, Stream, Trailers);
480send_data(State, Stream, fin, {trailers, Trailers}, _) ->
481	{State, Stream#stream{local_trailers=Trailers}};
482%% Send data immediately if we can, buffer otherwise.
483send_data(State=#http2_state{local_window=ConnWindow},
484		Stream=#stream{local_window=StreamWindow}, IsFin, Data, In)
485		when ConnWindow =< 0; StreamWindow =< 0 ->
486	{State, queue_data(Stream, IsFin, Data, In)};
487send_data(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
488		remote_settings=RemoteSettings, local_window=ConnWindow},
489		Stream=#stream{id=StreamID, local_window=StreamWindow}, IsFin, Data, In) ->
490	RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384),
491	ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity),
492	MaxSendSize = min(
493		min(ConnWindow, StreamWindow),
494		min(RemoteMaxFrameSize, ConfiguredMaxFrameSize)
495	),
496	case Data of
497%		{sendfile, Offset, Bytes, Path} when Bytes =< MaxSendSize ->
498%			Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)),
499%			Transport:sendfile(Socket, Path, Offset, Bytes),
500%			{State#http2_state{local_window=ConnWindow - Bytes},
501%				Stream#stream{local=IsFin, local_window=StreamWindow - Bytes}};
502%		{sendfile, Offset, Bytes, Path} ->
503%			Transport:send(Socket, cow_http2:data_header(StreamID, nofin, MaxSendSize)),
504%			Transport:sendfile(Socket, Path, Offset, MaxSendSize),
505%			send_data(State#http2_state{local_window=ConnWindow - MaxSendSize},
506%				Stream#stream{local_window=StreamWindow - MaxSendSize},
507%				IsFin, {sendfile, Offset + MaxSendSize, Bytes - MaxSendSize, Path}, In);
508		Iolist0 ->
509			IolistSize = iolist_size(Iolist0),
510			if
511				IolistSize =< MaxSendSize ->
512					Transport:send(Socket, cow_http2:data(StreamID, IsFin, Iolist0)),
513					{State#http2_state{local_window=ConnWindow - IolistSize},
514						Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize}};
515				true ->
516					{Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0),
517					Transport:send(Socket, cow_http2:data(StreamID, nofin, Iolist)),
518					send_data(State#http2_state{local_window=ConnWindow - MaxSendSize},
519						Stream#stream{local_window=StreamWindow - MaxSendSize},
520						IsFin, More, In)
521			end
522	end.
523
524send_trailers(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0},
525		Stream=#stream{id=StreamID}, Trailers) ->
526	{HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0),
527	Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
528	{State#http2_state{encode_state=EncodeState}, Stream#stream{local=fin}}.
529
530queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) ->
531	DataSize = case Data of
532%		{sendfile, _, Bytes, _} -> Bytes;
533		Iolist -> iolist_size(Iolist)
534	end,
535	Q = queue:In({IsFin, DataSize, Data}, Q0),
536	Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}.
537
538cancel(State=#http2_state{socket=Socket, transport=Transport},
539		StreamRef, ReplyTo) ->
540	case get_stream_by_ref(StreamRef, State) of
541		#stream{id=StreamID} ->
542			Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)),
543			delete_stream(StreamID, State);
544		false ->
545			error_stream_not_found(State, StreamRef, ReplyTo)
546	end.
547
548%% @todo Add unprocessed streams when GOAWAY handling is done.
549down(#http2_state{streams=Streams}) ->
550	KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
551	{KilledStreams, []}.
552
553terminate(#http2_state{socket=Socket, transport=Transport, streams=Streams}, Reason) ->
554	%% Because a particular stream is unknown,
555	%% we're sending the error message to all streams.
556	%% @todo We should not send duplicate messages to processes.
557	%% @todo We should probably also inform the owner process.
558	_ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams],
559	%% @todo LastGoodStreamID
560	Transport:send(Socket, cow_http2:goaway(0, terminate_reason(Reason), <<>>)),
561	terminate_ret(Reason).
562
563terminate(State=#http2_state{socket=Socket, transport=Transport}, StreamID, Reason) ->
564	case get_stream_by_id(StreamID, State) of
565		#stream{reply_to=ReplyTo} ->
566			ReplyTo ! {gun_error, self(), Reason},
567			%% @todo LastGoodStreamID
568			Transport:send(Socket, cow_http2:goaway(0, terminate_reason(Reason), <<>>)),
569			terminate_ret(Reason);
570		_ ->
571			terminate(State, Reason)
572	end.
573
574terminate_reason({connection_error, Reason, _}) -> Reason;
575terminate_reason({stop, _, _}) -> no_error.
576
577terminate_ret(Reason={connection_error, _, _}) -> {error, Reason};
578terminate_ret(_) -> close.
579
580%% Stream functions.
581
582stream_decode_init(State=#http2_state{decode_state=DecodeState0}, StreamID, IsFin, HeaderBlock) ->
583	try cow_hpack:decode(HeaderBlock, DecodeState0) of
584		{Headers, DecodeState} ->
585			stream_pseudo_headers_init(State#http2_state{decode_state=DecodeState},
586				StreamID, IsFin, Headers)
587	catch _:_ ->
588		terminate(State, {connection_error, compression_error,
589			'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
590	end.
591
592stream_pseudo_headers_init(State, StreamID, IsFin, Headers0) ->
593	case pseudo_headers(Headers0, #{}) of
594		{ok, PseudoHeaders, Headers} ->
595			stream_resp_init(State, StreamID, IsFin, Headers, PseudoHeaders);
596%% @todo When we handle trailers properly:
597%		{ok, _, _} ->
598%			stream_malformed(State, StreamID,
599%				'A required pseudo-header was not found. (RFC7540 8.1.2.3)');
600%% Or:
601%		{ok, _, _} ->
602%			stream_reset(State, StreamID, {stream_error, protocol_error,
603%				'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'})
604		{error, HumanReadable} ->
605			stream_reset(State, StreamID, {stream_error, protocol_error, HumanReadable})
606	end.
607
608pseudo_headers([{<<":status">>, _}|_], #{status := _}) ->
609	{error, 'Multiple :status pseudo-headers were found. (RFC7540 8.1.2.3)'};
610pseudo_headers([{<<":status">>, Status}|Tail], PseudoHeaders) ->
611	try cow_http:status_to_integer(Status) of
612		IntStatus ->
613			pseudo_headers(Tail, PseudoHeaders#{status => IntStatus})
614	catch _:_ ->
615		{error, 'The :status pseudo-header value is invalid. (RFC7540 8.1.2.4)'}
616	end;
617pseudo_headers([{<<":", _/bits>>, _}|_], _) ->
618	{error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'};
619pseudo_headers(Headers, PseudoHeaders) ->
620	{ok, PseudoHeaders, Headers}.
621
622stream_resp_init(State=#http2_state{content_handlers=Handlers0},
623		StreamID, IsFin, Headers, PseudoHeaders) ->
624	case get_stream_by_id(StreamID, State) of
625		Stream = #stream{ref=StreamRef, reply_to=ReplyTo, remote=nofin} ->
626			case PseudoHeaders of
627				#{status := Status} when Status >= 100, Status =< 199 ->
628					ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers},
629					State;
630				#{status := Status} ->
631					ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers},
632					Handlers = case IsFin of
633						fin -> undefined;
634						nofin ->
635							gun_content_handler:init(ReplyTo, StreamRef,
636								Status, Headers, Handlers0)
637					end,
638					remote_fin(Stream#stream{handler_state=Handlers}, State, IsFin);
639				%% @todo For now we assume that it's a trailer if there's no :status.
640				%% A better state machine is needed to distinguish between that and errors.
641				_ ->
642					%% @todo We probably want to pass this to gun_content_handler?
643					ReplyTo ! {gun_trailers, self(), StreamRef, Headers},
644					remote_fin(Stream, State, fin)
645			end;
646		_ ->
647			stream_reset(State, StreamID, {stream_error, stream_closed,
648				'HEADERS frame received for a closed or non-existent stream. (RFC7540 6.1)'})
649	end.
650
651stream_reset(State=#http2_state{socket=Socket, transport=Transport,
652		streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) ->
653	Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
654	case lists:keytake(StreamID, #stream.id, Streams0) of
655		{value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} ->
656			ReplyTo ! {gun_error, self(), StreamRef, StreamError},
657			State#http2_state{streams=Streams};
658		false ->
659			%% @todo Unknown stream. Not sure what to do here. Check again once all
660			%% terminate calls have been written.
661			State
662	end.
663
664error_stream_closed(State, StreamRef, ReplyTo) ->
665	ReplyTo ! {gun_error, self(), StreamRef, {badstate,
666		"The stream has already been closed."}},
667	State.
668
669error_stream_not_found(State, StreamRef, ReplyTo) ->
670	ReplyTo ! {gun_error, self(), StreamRef, {badstate,
671		"The stream cannot be found."}},
672	State.
673
674%% Streams.
675%% @todo probably change order of args and have state first?
676
677new_stream(StreamID, StreamRef, ReplyTo, Remote, Local, #http2_state{
678		local_settings=#{initial_window_size := RemoteWindow},
679		remote_settings=#{initial_window_size := LocalWindow}}) ->
680	#stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
681		remote=Remote, remote_window=RemoteWindow,
682		local=Local, local_window=LocalWindow}.
683
684get_stream_by_id(StreamID, #http2_state{streams=Streams}) ->
685	lists:keyfind(StreamID, #stream.id, Streams).
686
687get_stream_by_ref(StreamRef, #http2_state{streams=Streams}) ->
688	lists:keyfind(StreamRef, #stream.ref, Streams).
689
690delete_stream(StreamID, State=#http2_state{streams=Streams}) ->
691	Streams2 = lists:keydelete(StreamID, #stream.id, Streams),
692	State#http2_state{streams=Streams2}.
693
694remote_fin(S=#stream{local=fin}, State, fin) ->
695	delete_stream(S#stream.id, State);
696%% We always replace the stream in the state because
697%% the content handler state has changed.
698remote_fin(S, State=#http2_state{streams=Streams}, IsFin) ->
699	Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams,
700		S#stream{remote=IsFin}),
701	State#http2_state{streams=Streams2}.
702
703maybe_delete_stream(State, Stream=#stream{local=fin, remote=fin}) ->
704	delete_stream(Stream#stream.id, State);
705maybe_delete_stream(State=#http2_state{streams=Streams}, Stream) ->
706	State#http2_state{streams=
707		lists:keyreplace(Stream#stream.id, #stream.id, Streams, Stream)}.
708