1%% Copyright (c) 2016-2018, Loïc Hoguin <essen@ninenines.eu> 2%% 3%% Permission to use, copy, modify, and/or distribute this software for any 4%% purpose with or without fee is hereby granted, provided that the above 5%% copyright notice and this permission notice appear in all copies. 6%% 7%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 8%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 9%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 10%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 11%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 12%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 13%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 14 15-module(gun_http2). 16 17-export([check_options/1]). 18-export([name/0]). 19-export([init/4]). 20-export([handle/2]). 21-export([close/1]). 22-export([keepalive/1]). 23-export([request/8]). 24-export([request/9]). 25-export([data/5]). 26-export([cancel/3]). 27-export([down/1]). 28 29-record(stream, { 30 id :: non_neg_integer(), 31 ref :: reference(), 32 reply_to :: pid(), 33 %% Whether we finished sending data. 34 local = nofin :: fin | nofin, 35 %% Local flow control window (how much we can send). 36 local_window :: integer(), 37 %% Buffered data waiting for the flow control window to increase. 38 local_buffer = queue:new() :: queue:queue( 39 {fin | nofin, non_neg_integer(), iolist()}), 40 local_buffer_size = 0 :: non_neg_integer(), 41 local_trailers = undefined :: undefined | cow_http:headers(), 42 %% Whether we finished receiving data. 43 remote = nofin :: fin | nofin, 44 %% Remote flow control window (how much we accept to receive). 45 remote_window :: integer(), 46 %% Content handlers state. 47 handler_state :: undefined | gun_content_handler:state() 48}). 49 50-record(http2_state, { 51 owner :: pid(), 52 socket :: inet:socket() | ssl:sslsocket(), 53 transport :: module(), 54 opts = #{} :: map(), %% @todo 55 content_handlers :: gun_content_handler:opt(), 56 buffer = <<>> :: binary(), 57 58 local_settings = #{ 59 initial_window_size => 65535, 60 max_frame_size => 16384 61 } :: map(), 62 remote_settings = #{ 63 initial_window_size => 65535 64 } :: map(), 65 66 %% Connection-wide flow control window. 67 local_window = 65535 :: integer(), %% How much we can send. 68 remote_window = 65535 :: integer(), %% How much we accept to receive. 69 70 streams = [] :: [#stream{}], 71 stream_id = 1 :: non_neg_integer(), 72 73 %% The client starts by sending a sequence of bytes as a preface, 74 %% followed by a potentially empty SETTINGS frame. Then the connection 75 %% is established and continues normally. An exception is when a HEADERS 76 %% frame is sent followed by CONTINUATION frames: no other frame can be 77 %% sent in between. 78 parse_state = undefined :: preface | normal 79 | {continuation, cowboy_stream:streamid(), cowboy_stream:fin(), binary()}, 80 81 %% HPACK decoding and encoding state. 82 decode_state = cow_hpack:init() :: cow_hpack:state(), 83 encode_state = cow_hpack:init() :: cow_hpack:state() 84}). 85 86check_options(Opts) -> 87 do_check_options(maps:to_list(Opts)). 88 89do_check_options([]) -> 90 ok; 91do_check_options([Opt={content_handlers, Handlers}|Opts]) -> 92 case gun_content_handler:check_option(Handlers) of 93 ok -> do_check_options(Opts); 94 error -> {error, {options, {http, Opt}}} 95 end; 96do_check_options([{keepalive, infinity}|Opts]) -> 97 do_check_options(Opts); 98do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> 99 do_check_options(Opts); 100%% @todo max_frame_size_sent 101do_check_options([Opt|_]) -> 102 {error, {options, {http2, Opt}}}. 103 104name() -> http2. 105 106init(Owner, Socket, Transport, Opts) -> 107 Handlers = maps:get(content_handlers, Opts, [gun_data_h]), 108 State = #http2_state{owner=Owner, socket=Socket, 109 transport=Transport, opts=Opts, content_handlers=Handlers, 110 parse_state=preface}, 111 #http2_state{local_settings=Settings} = State, 112 %% Send the HTTP/2 preface. 113 Transport:send(Socket, [ 114 << "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>, 115 cow_http2:settings(Settings) 116 ]), 117 State. 118 119handle(Data, State=#http2_state{buffer=Buffer}) -> 120 parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}). 121 122parse(Data0, State0=#http2_state{buffer=Buffer, parse_state=preface}) -> 123 Data = << Buffer/binary, Data0/binary >>, 124 case cow_http2:parse(Data) of 125 {ok, Frame, Rest} when element(1, Frame) =:= settings -> 126 case frame(Frame, State0#http2_state{parse_state=normal}) of 127 close -> close; 128 Error = {error, _} -> Error; 129 State -> parse(Rest, State) 130 end; 131 more -> 132 case Data of 133 %% Maybe we have a proper SETTINGS frame. 134 <<_:24,4:8,_/bits>> -> 135 {state, State0#http2_state{buffer=Data}}; 136 %% Not a SETTINGS frame, this is an invalid preface. 137 <<"HTTP/1",_/bits>> -> 138 terminate(State0, {connection_error, protocol_error, 139 'Invalid connection preface received. Appears to be an HTTP/1 response? (RFC7540 3.5)'}); 140 _ -> 141 terminate(State0, {connection_error, protocol_error, 142 'Invalid connection preface received. (RFC7540 3.5)'}) 143 end; 144 %% Any error in the preface is converted to this specific error 145 %% to make debugging the problem easier (it's the server's fault). 146 _ -> 147 Reason = case Data of 148 <<"HTTP/1",_/bits>> -> 149 'Invalid connection preface received. Appears to be an HTTP/1 response? (RFC7540 3.5)'; 150 _ -> 151 'Invalid connection preface received. (RFC7540 3.5)' 152 end, 153 terminate(State0, {connection_error, protocol_error, Reason}) 154 end; 155parse(Data0, State0=#http2_state{buffer=Buffer, parse_state=PS}) -> 156 Data = << Buffer/binary, Data0/binary >>, 157 case cow_http2:parse(Data) of 158 {ok, Frame, Rest} when PS =:= normal -> 159 case frame(Frame, State0) of 160 close -> close; 161 Error = {error, _} -> Error; 162 State1 -> parse(Rest, State1) 163 end; 164 {ok, Frame, Rest} when element(1, PS) =:= continuation -> 165 case continuation_frame(Frame, State0) of 166 close -> close; 167 Error = {error, _} -> Error; 168 State1 -> parse(Rest, State1) 169 end; 170 {ignore, _} when element(1, PS) =:= continuation -> 171 terminate(State0, {connection_error, protocol_error, 172 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'}); 173 {ignore, Rest} -> 174 parse(Rest, State0); 175 {stream_error, StreamID, Reason, Human, Rest} -> 176 parse(Rest, stream_reset(State0, StreamID, {stream_error, Reason, Human})); 177 Error = {connection_error, _, _} -> 178 terminate(State0, Error); 179 more -> 180 {state, State0#http2_state{buffer=Data}} 181 end. 182 183%% DATA frame. 184frame({data, StreamID, IsFin, Data}, State0=#http2_state{remote_window=ConnWindow}) -> 185 case get_stream_by_id(StreamID, State0) of 186 Stream0 = #stream{remote=nofin, remote_window=StreamWindow, handler_state=Handlers0} -> 187 Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), 188 {Stream, State} = send_window_update( 189 Stream0#stream{remote_window=StreamWindow - byte_size(Data), 190 handler_state=Handlers}, 191 State0#http2_state{remote_window=ConnWindow - byte_size(Data)}), 192 remote_fin(Stream, State, IsFin); 193 _ -> 194 %% @todo protocol_error if not existing 195 stream_reset(State0, StreamID, {stream_error, stream_closed, 196 'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'}) 197 end; 198%% Single HEADERS frame headers block. 199frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, State) -> 200 stream_decode_init(State, StreamID, IsFin, HeaderBlock); 201%% HEADERS frame starting a headers block. Enter continuation mode. 202frame({headers, StreamID, IsFin, head_nofin, HeaderBlockFragment}, State) -> 203 State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}}; 204%% Single HEADERS frame headers block with priority. 205frame({headers, StreamID, IsFin, head_fin, 206 _IsExclusive, _DepStreamID, _Weight, HeaderBlock}, State) -> 207 stream_decode_init(State, StreamID, IsFin, HeaderBlock); 208%% @todo HEADERS frame starting a headers block. Enter continuation mode. 209%frame(State, {headers, StreamID, IsFin, head_nofin, 210% _IsExclusive, _DepStreamID, _Weight, HeaderBlockFragment}) -> 211% %% @todo Handle priority. 212% State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}}; 213%% @todo PRIORITY frame. 214%frame(State, {priority, _StreamID, _IsExclusive, _DepStreamID, _Weight}) -> 215% %% @todo Validate StreamID? 216% %% @todo Handle priority. 217% State; 218%% @todo RST_STREAM frame. 219frame({rst_stream, StreamID, Reason}, State) -> 220 stream_reset(State, StreamID, {stream_error, Reason, 'Stream reset by server.'}); 221%% SETTINGS frame. 222frame({settings, Settings}, State=#http2_state{socket=Socket, transport=Transport, 223 remote_settings=Settings0}) -> 224 Transport:send(Socket, cow_http2:settings_ack()), 225 State#http2_state{remote_settings=maps:merge(Settings0, Settings)}; 226%% Ack for a previously sent SETTINGS frame. 227frame(settings_ack, State) -> %% @todo =#http2_state{next_settings=_NextSettings}) -> 228 %% @todo Apply SETTINGS that require synchronization. 229 State; 230%% PUSH_PROMISE frame. 231%% @todo Continuation. 232frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock}, 233 State=#http2_state{streams=Streams, decode_state=DecodeState0}) -> 234 case get_stream_by_id(PromisedStreamID, State) of 235 false -> 236 case get_stream_by_id(StreamID, State) of 237 #stream{ref=StreamRef, reply_to=ReplyTo} -> 238 try cow_hpack:decode(HeaderBlock, DecodeState0) of 239 {Headers0, DecodeState} -> 240 {Method, Scheme, Authority, Path, Headers} = try 241 {value, {_, Method0}, Headers1} = lists:keytake(<<":method">>, 1, Headers0), 242 {value, {_, Scheme0}, Headers2} = lists:keytake(<<":scheme">>, 1, Headers1), 243 {value, {_, Authority0}, Headers3} = lists:keytake(<<":authority">>, 1, Headers2), 244 {value, {_, Path0}, Headers4} = lists:keytake(<<":path">>, 1, Headers3), 245 {Method0, Scheme0, Authority0, Path0, Headers4} 246 catch error:badmatch -> 247 stream_reset(State, StreamID, {stream_error, protocol_error, 248 'Malformed push promise; missing pseudo-header field. (RFC7540 8.1.2.3)'}) 249 end, 250 NewStreamRef = make_ref(), 251 ReplyTo ! {gun_push, self(), StreamRef, NewStreamRef, Method, 252 iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers}, 253 NewStream = new_stream(PromisedStreamID, NewStreamRef, ReplyTo, 254 nofin, fin, State), 255 State#http2_state{streams=[NewStream|Streams], decode_state=DecodeState} 256 catch _:_ -> 257 terminate(State, {connection_error, compression_error, 258 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) 259 end; 260 _ -> 261 stream_reset(State, StreamID, {stream_error, stream_closed, 262 'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'}) 263 end; 264 _ -> 265 stream_reset(State, StreamID, {stream_error, todo, ''}) 266 end; 267%% PING frame. 268frame({ping, Opaque}, State=#http2_state{socket=Socket, transport=Transport}) -> 269 Transport:send(Socket, cow_http2:ping_ack(Opaque)), 270 State; 271%% Ack for a previously sent PING frame. 272%% 273%% @todo Might want to check contents but probably a waste of time. 274frame({ping_ack, _Opaque}, State) -> 275 State; 276%% GOAWAY frame. 277frame(Frame={goaway, StreamID, _, _}, State) -> 278 terminate(State, StreamID, {stop, Frame, 'Client is going away.'}); 279%% Connection-wide WINDOW_UPDATE frame. 280frame({window_update, Increment}, State=#http2_state{local_window=ConnWindow}) 281 when ConnWindow + Increment > 16#7fffffff -> 282 terminate(State, {connection_error, flow_control_error, 283 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'}); 284frame({window_update, Increment}, State=#http2_state{local_window=ConnWindow}) -> 285 send_data(State#http2_state{local_window=ConnWindow + Increment}); 286%% Stream-specific WINDOW_UPDATE frame. 287frame({window_update, StreamID, Increment}, State0=#http2_state{streams=Streams0}) -> 288 case lists:keyfind(StreamID, #stream.id, Streams0) of 289 #stream{local_window=StreamWindow} when StreamWindow + Increment > 16#7fffffff -> 290 stream_reset(State0, StreamID, {stream_error, flow_control_error, 291 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'}); 292 Stream0 = #stream{local_window=StreamWindow} -> 293 {State, Stream} = send_data(State0, 294 Stream0#stream{local_window=StreamWindow + Increment}), 295 Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream), 296 State#http2_state{streams=Streams}; 297 false -> 298 %% @todo Receiving this frame on a stream in the idle state is an error. 299 %% WINDOW_UPDATE frames may be received for a short period of time 300 %% after a stream is closed. They must be ignored. 301 State0 302 end; 303%% Unexpected CONTINUATION frame. 304frame({continuation, StreamID, _, _}, State) -> 305 terminate(State, StreamID, {connection_error, protocol_error, 306 'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}). 307 308continuation_frame({continuation, StreamID, head_fin, HeaderBlockFragment1}, 309 State=#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment0}}) -> 310 HeaderBlock = << HeaderBlockFragment0/binary, HeaderBlockFragment1/binary >>, 311 stream_decode_init(State#http2_state{parse_state=normal}, StreamID, IsFin, HeaderBlock); 312continuation_frame({continuation, StreamID, head_nofin, HeaderBlockFragment1}, 313 State=#http2_state{parse_state= 314 {continuation, StreamID, IsFin, HeaderBlockFragment0}}) -> 315 State#http2_state{parse_state={continuation, StreamID, IsFin, 316 << HeaderBlockFragment0/binary, HeaderBlockFragment1/binary >>}}; 317continuation_frame(_, State) -> 318 terminate(State, {connection_error, protocol_error, 319 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'}). 320 321send_window_update(Stream=#stream{id=StreamID, remote_window=StreamWindow0}, 322 State=#http2_state{socket=Socket, transport=Transport, remote_window=ConnWindow0}) -> 323 %% @todo We should make the windows configurable. 324 MinConnWindow = 8000000, 325 MinStreamWindow = 1000000, 326 ConnWindow = if 327 ConnWindow0 =< MinConnWindow -> 328 Transport:send(Socket, cow_http2:window_update(MinConnWindow)), 329 ConnWindow0 + MinConnWindow; 330 true -> 331 ConnWindow0 332 end, 333 StreamWindow = if 334 StreamWindow0 =< MinStreamWindow -> 335 Transport:send(Socket, cow_http2:window_update(StreamID, MinStreamWindow)), 336 StreamWindow0 + MinStreamWindow; 337 true -> 338 StreamWindow0 339 end, 340 {Stream#stream{remote_window=StreamWindow}, 341 State#http2_state{remote_window=ConnWindow}}. 342 343close(#http2_state{streams=Streams}) -> 344 close_streams(Streams). 345 346close_streams([]) -> 347 ok; 348close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> 349 ReplyTo ! {gun_error, self(), StreamRef, {closed, 350 "The connection was lost."}}, 351 close_streams(Tail). 352 353keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> 354 Transport:send(Socket, cow_http2:ping(0)), 355 State. 356 357request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, 358 streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo, 359 Method, Host, Port, Path, Headers) -> 360 {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers), 361 IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers)) 362 orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of 363 true -> nofin; 364 false -> fin 365 end, 366 Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), 367 Stream = new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin, State), 368 State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}. 369 370%% @todo Handle Body > 16MB. (split it out into many frames) 371request(State0=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, 372 streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo, 373 Method, Host, Port, Path, Headers0, Body) -> 374 Headers = lists:keystore(<<"content-length">>, 1, Headers0, 375 {<<"content-length">>, integer_to_binary(iolist_size(Body))}), 376 {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers), 377 Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), 378 Stream0 = new_stream(StreamID, StreamRef, ReplyTo, nofin, nofin, State0), 379 {State, Stream} = send_data(State0, Stream0, fin, Body), 380 State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}. 381 382prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) -> 383 Host2 = case Host0 of 384 {local, _SocketPath} -> <<>>; 385 Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); 386 _ -> Host0 387 end, 388 Authority = case lists:keyfind(<<"host">>, 1, Headers0) of 389 {_, Host} -> Host; 390 _ -> [Host2, $:, integer_to_binary(Port)] 391 end, 392 %% @todo We also must remove any header found in the connection header. 393 Headers1 = 394 lists:keydelete(<<"host">>, 1, 395 lists:keydelete(<<"connection">>, 1, 396 lists:keydelete(<<"keep-alive">>, 1, 397 lists:keydelete(<<"proxy-connection">>, 1, 398 lists:keydelete(<<"transfer-encoding">>, 1, 399 lists:keydelete(<<"upgrade">>, 1, Headers0)))))), 400 Headers = [ 401 {<<":method">>, Method}, 402 {<<":scheme">>, case Transport of 403 gun_tls -> <<"https">>; 404 gun_tcp -> <<"http">> 405 end}, 406 {<<":authority">>, Authority}, 407 {<<":path">>, Path} 408 |Headers1], 409 cow_hpack:encode(Headers, EncodeState). 410 411data(State0, StreamRef, ReplyTo, IsFin, Data) -> 412 case get_stream_by_ref(StreamRef, State0) of 413 #stream{local=fin} -> 414 error_stream_closed(State0, StreamRef, ReplyTo); 415 Stream0 = #stream{} -> 416 {State, Stream} = send_data(State0, Stream0, IsFin, Data), 417 maybe_delete_stream(State, Stream); 418 false -> 419 error_stream_not_found(State0, StreamRef, ReplyTo) 420 end. 421 422%% @todo Should we ever want to implement the PRIORITY mechanism, 423%% this would be the place to do it. Right now, we just go over 424%% all streams and send what we can until either everything is 425%% sent or we run out of space in the window. 426send_data(State=#http2_state{streams=Streams}) -> 427 resume_streams(State, Streams, []). 428 429%% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update 430%% the local stream windows for all active streams and perhaps 431%% resume sending data. 432%update_streams_local_window(State=#http2_state{streams=Streams0}, Increment) -> 433% Streams = [ 434% S#stream{local_window=StreamWindow + Increment} 435% || S=#stream{local_window=StreamWindow} <- Streams0], 436% resume_streams(State, Streams, []). 437 438%% When we receive an ack to a SETTINGS frame we sent we need to update 439%% the remote stream windows for all active streams. 440%update_streams_remote_window(State=#http2_state{streams=Streams0}, Increment) -> 441% Streams = [ 442% S#stream{remote_window=StreamWindow + Increment} 443% || S=#stream{remote_window=StreamWindow} <- Streams0], 444% State#http2_state{streams=Streams}. 445 446resume_streams(State, [], Acc) -> 447 State#http2_state{streams=lists:reverse(Acc)}; 448%% While technically we should never get < 0 here, let's be on the safe side. 449resume_streams(State=#http2_state{local_window=ConnWindow}, Streams, Acc) 450 when ConnWindow =< 0 -> 451 State#http2_state{streams=lists:reverse(Acc, Streams)}; 452%% We rely on send_data/2 to do all the necessary checks about the stream. 453resume_streams(State0, [Stream0|Tail], Acc) -> 454 {State1, Stream} = send_data(State0, Stream0), 455 resume_streams(State1, Tail, [Stream|Acc]). 456 457send_data(State, Stream=#stream{local=Local, local_buffer_size=0, local_trailers=Trailers}) 458 when (Trailers =/= undefined) andalso ((Local =:= idle) orelse (Local =:= nofin)) -> 459 send_trailers(State, Stream#stream{local_trailers=undefined}, Trailers); 460%% @todo It's possible that the stream terminates. We must remove it. 461send_data(State=#http2_state{local_window=ConnWindow}, 462 Stream=#stream{local=IsFin, local_window=StreamWindow, local_buffer_size=BufferSize}) 463 when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 -> 464 {State, Stream}; 465send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}) -> 466 %% We know there is an item in the queue. 467 {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0), 468 {State, Stream} = send_data(State0, 469 Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize}, 470 IsFin, Data, in_r), 471 send_data(State, Stream). 472 473send_data(State, Stream, IsFin, Data) -> 474 send_data(State, Stream, IsFin, Data, in). 475 476%% We can send trailers immediately if the queue is empty, otherwise we queue. 477%% We always send trailer frames even if the window is empty. 478send_data(State, Stream=#stream{local_buffer_size=0}, fin, {trailers, Trailers}, _) -> 479 send_trailers(State, Stream, Trailers); 480send_data(State, Stream, fin, {trailers, Trailers}, _) -> 481 {State, Stream#stream{local_trailers=Trailers}}; 482%% Send data immediately if we can, buffer otherwise. 483send_data(State=#http2_state{local_window=ConnWindow}, 484 Stream=#stream{local_window=StreamWindow}, IsFin, Data, In) 485 when ConnWindow =< 0; StreamWindow =< 0 -> 486 {State, queue_data(Stream, IsFin, Data, In)}; 487send_data(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, 488 remote_settings=RemoteSettings, local_window=ConnWindow}, 489 Stream=#stream{id=StreamID, local_window=StreamWindow}, IsFin, Data, In) -> 490 RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384), 491 ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity), 492 MaxSendSize = min( 493 min(ConnWindow, StreamWindow), 494 min(RemoteMaxFrameSize, ConfiguredMaxFrameSize) 495 ), 496 case Data of 497% {sendfile, Offset, Bytes, Path} when Bytes =< MaxSendSize -> 498% Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)), 499% Transport:sendfile(Socket, Path, Offset, Bytes), 500% {State#http2_state{local_window=ConnWindow - Bytes}, 501% Stream#stream{local=IsFin, local_window=StreamWindow - Bytes}}; 502% {sendfile, Offset, Bytes, Path} -> 503% Transport:send(Socket, cow_http2:data_header(StreamID, nofin, MaxSendSize)), 504% Transport:sendfile(Socket, Path, Offset, MaxSendSize), 505% send_data(State#http2_state{local_window=ConnWindow - MaxSendSize}, 506% Stream#stream{local_window=StreamWindow - MaxSendSize}, 507% IsFin, {sendfile, Offset + MaxSendSize, Bytes - MaxSendSize, Path}, In); 508 Iolist0 -> 509 IolistSize = iolist_size(Iolist0), 510 if 511 IolistSize =< MaxSendSize -> 512 Transport:send(Socket, cow_http2:data(StreamID, IsFin, Iolist0)), 513 {State#http2_state{local_window=ConnWindow - IolistSize}, 514 Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize}}; 515 true -> 516 {Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0), 517 Transport:send(Socket, cow_http2:data(StreamID, nofin, Iolist)), 518 send_data(State#http2_state{local_window=ConnWindow - MaxSendSize}, 519 Stream#stream{local_window=StreamWindow - MaxSendSize}, 520 IsFin, More, In) 521 end 522 end. 523 524send_trailers(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0}, 525 Stream=#stream{id=StreamID}, Trailers) -> 526 {HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0), 527 Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), 528 {State#http2_state{encode_state=EncodeState}, Stream#stream{local=fin}}. 529 530queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) -> 531 DataSize = case Data of 532% {sendfile, _, Bytes, _} -> Bytes; 533 Iolist -> iolist_size(Iolist) 534 end, 535 Q = queue:In({IsFin, DataSize, Data}, Q0), 536 Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}. 537 538cancel(State=#http2_state{socket=Socket, transport=Transport}, 539 StreamRef, ReplyTo) -> 540 case get_stream_by_ref(StreamRef, State) of 541 #stream{id=StreamID} -> 542 Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), 543 delete_stream(StreamID, State); 544 false -> 545 error_stream_not_found(State, StreamRef, ReplyTo) 546 end. 547 548%% @todo Add unprocessed streams when GOAWAY handling is done. 549down(#http2_state{streams=Streams}) -> 550 KilledStreams = [Ref || #stream{ref=Ref} <- Streams], 551 {KilledStreams, []}. 552 553terminate(#http2_state{socket=Socket, transport=Transport, streams=Streams}, Reason) -> 554 %% Because a particular stream is unknown, 555 %% we're sending the error message to all streams. 556 %% @todo We should not send duplicate messages to processes. 557 %% @todo We should probably also inform the owner process. 558 _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams], 559 %% @todo LastGoodStreamID 560 Transport:send(Socket, cow_http2:goaway(0, terminate_reason(Reason), <<>>)), 561 terminate_ret(Reason). 562 563terminate(State=#http2_state{socket=Socket, transport=Transport}, StreamID, Reason) -> 564 case get_stream_by_id(StreamID, State) of 565 #stream{reply_to=ReplyTo} -> 566 ReplyTo ! {gun_error, self(), Reason}, 567 %% @todo LastGoodStreamID 568 Transport:send(Socket, cow_http2:goaway(0, terminate_reason(Reason), <<>>)), 569 terminate_ret(Reason); 570 _ -> 571 terminate(State, Reason) 572 end. 573 574terminate_reason({connection_error, Reason, _}) -> Reason; 575terminate_reason({stop, _, _}) -> no_error. 576 577terminate_ret(Reason={connection_error, _, _}) -> {error, Reason}; 578terminate_ret(_) -> close. 579 580%% Stream functions. 581 582stream_decode_init(State=#http2_state{decode_state=DecodeState0}, StreamID, IsFin, HeaderBlock) -> 583 try cow_hpack:decode(HeaderBlock, DecodeState0) of 584 {Headers, DecodeState} -> 585 stream_pseudo_headers_init(State#http2_state{decode_state=DecodeState}, 586 StreamID, IsFin, Headers) 587 catch _:_ -> 588 terminate(State, {connection_error, compression_error, 589 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) 590 end. 591 592stream_pseudo_headers_init(State, StreamID, IsFin, Headers0) -> 593 case pseudo_headers(Headers0, #{}) of 594 {ok, PseudoHeaders, Headers} -> 595 stream_resp_init(State, StreamID, IsFin, Headers, PseudoHeaders); 596%% @todo When we handle trailers properly: 597% {ok, _, _} -> 598% stream_malformed(State, StreamID, 599% 'A required pseudo-header was not found. (RFC7540 8.1.2.3)'); 600%% Or: 601% {ok, _, _} -> 602% stream_reset(State, StreamID, {stream_error, protocol_error, 603% 'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'}) 604 {error, HumanReadable} -> 605 stream_reset(State, StreamID, {stream_error, protocol_error, HumanReadable}) 606 end. 607 608pseudo_headers([{<<":status">>, _}|_], #{status := _}) -> 609 {error, 'Multiple :status pseudo-headers were found. (RFC7540 8.1.2.3)'}; 610pseudo_headers([{<<":status">>, Status}|Tail], PseudoHeaders) -> 611 try cow_http:status_to_integer(Status) of 612 IntStatus -> 613 pseudo_headers(Tail, PseudoHeaders#{status => IntStatus}) 614 catch _:_ -> 615 {error, 'The :status pseudo-header value is invalid. (RFC7540 8.1.2.4)'} 616 end; 617pseudo_headers([{<<":", _/bits>>, _}|_], _) -> 618 {error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'}; 619pseudo_headers(Headers, PseudoHeaders) -> 620 {ok, PseudoHeaders, Headers}. 621 622stream_resp_init(State=#http2_state{content_handlers=Handlers0}, 623 StreamID, IsFin, Headers, PseudoHeaders) -> 624 case get_stream_by_id(StreamID, State) of 625 Stream = #stream{ref=StreamRef, reply_to=ReplyTo, remote=nofin} -> 626 case PseudoHeaders of 627 #{status := Status} when Status >= 100, Status =< 199 -> 628 ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers}, 629 State; 630 #{status := Status} -> 631 ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, 632 Handlers = case IsFin of 633 fin -> undefined; 634 nofin -> 635 gun_content_handler:init(ReplyTo, StreamRef, 636 Status, Headers, Handlers0) 637 end, 638 remote_fin(Stream#stream{handler_state=Handlers}, State, IsFin); 639 %% @todo For now we assume that it's a trailer if there's no :status. 640 %% A better state machine is needed to distinguish between that and errors. 641 _ -> 642 %% @todo We probably want to pass this to gun_content_handler? 643 ReplyTo ! {gun_trailers, self(), StreamRef, Headers}, 644 remote_fin(Stream, State, fin) 645 end; 646 _ -> 647 stream_reset(State, StreamID, {stream_error, stream_closed, 648 'HEADERS frame received for a closed or non-existent stream. (RFC7540 6.1)'}) 649 end. 650 651stream_reset(State=#http2_state{socket=Socket, transport=Transport, 652 streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) -> 653 Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), 654 case lists:keytake(StreamID, #stream.id, Streams0) of 655 {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} -> 656 ReplyTo ! {gun_error, self(), StreamRef, StreamError}, 657 State#http2_state{streams=Streams}; 658 false -> 659 %% @todo Unknown stream. Not sure what to do here. Check again once all 660 %% terminate calls have been written. 661 State 662 end. 663 664error_stream_closed(State, StreamRef, ReplyTo) -> 665 ReplyTo ! {gun_error, self(), StreamRef, {badstate, 666 "The stream has already been closed."}}, 667 State. 668 669error_stream_not_found(State, StreamRef, ReplyTo) -> 670 ReplyTo ! {gun_error, self(), StreamRef, {badstate, 671 "The stream cannot be found."}}, 672 State. 673 674%% Streams. 675%% @todo probably change order of args and have state first? 676 677new_stream(StreamID, StreamRef, ReplyTo, Remote, Local, #http2_state{ 678 local_settings=#{initial_window_size := RemoteWindow}, 679 remote_settings=#{initial_window_size := LocalWindow}}) -> 680 #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, 681 remote=Remote, remote_window=RemoteWindow, 682 local=Local, local_window=LocalWindow}. 683 684get_stream_by_id(StreamID, #http2_state{streams=Streams}) -> 685 lists:keyfind(StreamID, #stream.id, Streams). 686 687get_stream_by_ref(StreamRef, #http2_state{streams=Streams}) -> 688 lists:keyfind(StreamRef, #stream.ref, Streams). 689 690delete_stream(StreamID, State=#http2_state{streams=Streams}) -> 691 Streams2 = lists:keydelete(StreamID, #stream.id, Streams), 692 State#http2_state{streams=Streams2}. 693 694remote_fin(S=#stream{local=fin}, State, fin) -> 695 delete_stream(S#stream.id, State); 696%% We always replace the stream in the state because 697%% the content handler state has changed. 698remote_fin(S, State=#http2_state{streams=Streams}, IsFin) -> 699 Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams, 700 S#stream{remote=IsFin}), 701 State#http2_state{streams=Streams2}. 702 703maybe_delete_stream(State, Stream=#stream{local=fin, remote=fin}) -> 704 delete_stream(Stream#stream.id, State); 705maybe_delete_stream(State=#http2_state{streams=Streams}, Stream) -> 706 State#http2_state{streams= 707 lists:keyreplace(Stream#stream.id, #stream.id, Streams, Stream)}. 708