1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 use crate::decoder_instructions::{DecoderInstruction, DecoderInstructionReader};
8 use crate::encoder_instructions::EncoderInstruction;
9 use crate::header_block::HeaderEncoder;
10 use crate::qlog;
11 use crate::qpack_send_buf::QpackData;
12 use crate::reader::ReceiverConnWrapper;
13 use crate::stats::Stats;
14 use crate::table::{HeaderTable, LookupResult, ADDITIONAL_TABLE_ENTRY_SIZE};
15 use crate::{Error, QpackSettings, Res};
16 use neqo_common::{qdebug, qerror, qlog::NeqoQlog, qtrace, Header};
17 use neqo_transport::{Connection, Error as TransportError, StreamId};
18 use std::collections::{HashMap, HashSet, VecDeque};
19 use std::convert::TryFrom;
20 
21 pub const QPACK_UNI_STREAM_TYPE_ENCODER: u64 = 0x2;
22 
23 #[derive(Debug, PartialEq)]
24 enum LocalStreamState {
25     NoStream,
26     Uninitialized(StreamId),
27     Initialized(StreamId),
28 }
29 
30 impl LocalStreamState {
stream_id(&self) -> Option<StreamId>31     pub fn stream_id(&self) -> Option<StreamId> {
32         match self {
33             Self::NoStream => None,
34             Self::Uninitialized(stream_id) | Self::Initialized(stream_id) => Some(*stream_id),
35         }
36     }
37 }
38 
39 #[derive(Debug)]
40 pub struct QPackEncoder {
41     table: HeaderTable,
42     max_table_size: u64,
43     max_entries: u64,
44     instruction_reader: DecoderInstructionReader,
45     local_stream: LocalStreamState,
46     max_blocked_streams: u16,
47     // Remember header blocks that are referring to dynamic table.
48     // There can be multiple header blocks in one stream, headers, trailer, push stream request, etc.
49     // This HashMap maps a stream ID to a list of header blocks. Each header block is a list of
50     // referenced dynamic table entries.
51     unacked_header_blocks: HashMap<u64, VecDeque<HashSet<u64>>>,
52     blocked_stream_cnt: u16,
53     use_huffman: bool,
54     next_capacity: Option<u64>,
55     stats: Stats,
56 }
57 
58 impl QPackEncoder {
59     #[must_use]
new(qpack_settings: QpackSettings, use_huffman: bool) -> Self60     pub fn new(qpack_settings: QpackSettings, use_huffman: bool) -> Self {
61         Self {
62             table: HeaderTable::new(true),
63             max_table_size: qpack_settings.max_table_size_encoder,
64             max_entries: 0,
65             instruction_reader: DecoderInstructionReader::new(),
66             local_stream: LocalStreamState::NoStream,
67             max_blocked_streams: 0,
68             unacked_header_blocks: HashMap::new(),
69             blocked_stream_cnt: 0,
70             use_huffman,
71             next_capacity: None,
72             stats: Stats::default(),
73         }
74     }
75 
76     /// This function is use for setting encoders table max capacity. The value is received as
77     /// a `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting parameter.
78     /// # Errors
79     /// `EncoderStream` if value is too big.
80     /// `ChangeCapacity` if table capacity cannot be reduced.
set_max_capacity(&mut self, cap: u64) -> Res<()>81     pub fn set_max_capacity(&mut self, cap: u64) -> Res<()> {
82         if cap > (1 << 30) - 1 {
83             return Err(Error::EncoderStream);
84         }
85 
86         if cap == self.table.capacity() {
87             return Ok(());
88         }
89 
90         qdebug!(
91             [self],
92             "Set max capacity to new capacity:{} old:{} max_table_size={}.",
93             cap,
94             self.table.capacity(),
95             self.max_table_size,
96         );
97 
98         let new_cap = std::cmp::min(self.max_table_size, cap);
99         // we also set our table to the max allowed.
100         self.change_capacity(new_cap);
101         Ok(())
102     }
103 
104     /// This function is use for setting encoders max blocked streams. The value is received as
105     /// a `SETTINGS_QPACK_BLOCKED_STREAMS` setting parameter.
106     /// # Errors
107     /// `EncoderStream` if value is too big.
set_max_blocked_streams(&mut self, blocked_streams: u64) -> Res<()>108     pub fn set_max_blocked_streams(&mut self, blocked_streams: u64) -> Res<()> {
109         self.max_blocked_streams = u16::try_from(blocked_streams).or(Err(Error::EncoderStream))?;
110         Ok(())
111     }
112 
113     /// Reads decoder instructions.
114     /// # Errors
115     /// May return: `ClosedCriticalStream` if stream has been closed or `DecoderStream`
116     /// in case of any other transport error.
receive(&mut self, conn: &mut Connection, stream_id: u64) -> Res<()>117     pub fn receive(&mut self, conn: &mut Connection, stream_id: u64) -> Res<()> {
118         self.read_instructions(conn, stream_id)
119             .map_err(|e| map_error(&e))
120     }
121 
read_instructions(&mut self, conn: &mut Connection, stream_id: u64) -> Res<()>122     fn read_instructions(&mut self, conn: &mut Connection, stream_id: u64) -> Res<()> {
123         qdebug!([self], "read a new instraction");
124         loop {
125             let mut recv = ReceiverConnWrapper::new(conn, stream_id);
126             match self.instruction_reader.read_instructions(&mut recv) {
127                 Ok(instruction) => self.call_instruction(instruction, &mut conn.qlog_mut())?,
128                 Err(Error::NeedMoreData) => break Ok(()),
129                 Err(e) => break Err(e),
130             }
131         }
132     }
133 
recalculate_blocked_streams(&mut self)134     fn recalculate_blocked_streams(&mut self) {
135         let acked_inserts_cnt = self.table.get_acked_inserts_cnt();
136         self.blocked_stream_cnt = 0;
137         for hb_list in self.unacked_header_blocks.values_mut() {
138             debug_assert!(!hb_list.is_empty());
139             if hb_list.iter().flatten().any(|e| *e >= acked_inserts_cnt) {
140                 self.blocked_stream_cnt += 1;
141             }
142         }
143     }
144 
145     #[allow(
146         clippy::map_err_ignore,
147         unknown_lints,
148         renamed_and_removed_lints,
149         clippy::unknown_clippy_lints
150     )]
insert_count_instruction(&mut self, increment: u64) -> Res<()>151     fn insert_count_instruction(&mut self, increment: u64) -> Res<()> {
152         self.table
153             .increment_acked(increment)
154             .map_err(|_| Error::DecoderStream)?;
155         self.recalculate_blocked_streams();
156         Ok(())
157     }
158 
header_ack(&mut self, stream_id: u64)159     fn header_ack(&mut self, stream_id: u64) {
160         self.stats.header_acks_recv += 1;
161         let mut new_acked = self.table.get_acked_inserts_cnt();
162         if let Some(hb_list) = self.unacked_header_blocks.get_mut(&stream_id) {
163             if let Some(ref_list) = hb_list.pop_back() {
164                 for iter in ref_list {
165                     self.table.remove_ref(iter);
166                     if iter >= new_acked {
167                         new_acked = iter + 1;
168                     }
169                 }
170             } else {
171                 debug_assert!(false, "We should have at least one header block.");
172             }
173             if hb_list.is_empty() {
174                 self.unacked_header_blocks.remove(&stream_id);
175             }
176         }
177         if new_acked > self.table.get_acked_inserts_cnt() {
178             self.insert_count_instruction(new_acked - self.table.get_acked_inserts_cnt())
179                 .expect("This should neve happen");
180         }
181     }
182 
stream_cancellation(&mut self, stream_id: u64)183     fn stream_cancellation(&mut self, stream_id: u64) {
184         self.stats.stream_cancelled_recv += 1;
185         let mut was_blocker = false;
186         if let Some(mut hb_list) = self.unacked_header_blocks.remove(&stream_id) {
187             debug_assert!(!hb_list.is_empty());
188             while let Some(ref_list) = hb_list.pop_front() {
189                 for iter in ref_list {
190                     self.table.remove_ref(iter);
191                     was_blocker = was_blocker || (iter >= self.table.get_acked_inserts_cnt());
192                 }
193             }
194         }
195         if was_blocker {
196             debug_assert!(self.blocked_stream_cnt > 0);
197             self.blocked_stream_cnt -= 1;
198         }
199     }
200 
call_instruction( &mut self, instruction: DecoderInstruction, qlog: &mut NeqoQlog, ) -> Res<()>201     fn call_instruction(
202         &mut self,
203         instruction: DecoderInstruction,
204         qlog: &mut NeqoQlog,
205     ) -> Res<()> {
206         qdebug!([self], "call intruction {:?}", instruction);
207         match instruction {
208             DecoderInstruction::InsertCountIncrement { increment } => {
209                 qlog::qpack_read_insert_count_increment_instruction(
210                     qlog,
211                     increment,
212                     &increment.to_be_bytes(),
213                 );
214 
215                 self.insert_count_instruction(increment)
216             }
217             DecoderInstruction::HeaderAck { stream_id } => {
218                 self.header_ack(stream_id);
219                 Ok(())
220             }
221             DecoderInstruction::StreamCancellation { stream_id } => {
222                 self.stream_cancellation(stream_id);
223                 Ok(())
224             }
225             DecoderInstruction::NoInstruction => Ok(()),
226         }
227     }
228 
229     /// Inserts a new entry into a table and sends the corresponding instruction to a peer. An entry is added only
230     /// if it is possible to send the corresponding instruction immediately, i.e. the encoder stream is not
231     /// blocked by the flow control (or stream internal buffer(this is very unlikely)).
232     /// ### Errors
233     /// `EncoderStreamBlocked` if the encoder stream is blocked by the flow control.
234     /// `DynamicTableFull` if the dynamic table does not have enough space for the entry.
235     /// The function can return transport errors: `InvalidStreamId`, `InvalidInput` and `FinalSizeError`.
236     /// # Panics
237     /// When the insertion fails (it should not).
send_and_insert( &mut self, conn: &mut Connection, name: &[u8], value: &[u8], ) -> Res<u64>238     pub fn send_and_insert(
239         &mut self,
240         conn: &mut Connection,
241         name: &[u8],
242         value: &[u8],
243     ) -> Res<u64> {
244         qdebug!([self], "insert {:?} {:?}.", name, value);
245 
246         let entry_size = name.len() + value.len() + ADDITIONAL_TABLE_ENTRY_SIZE;
247 
248         if !self.table.insert_possible(entry_size) {
249             return Err(Error::DynamicTableFull);
250         }
251 
252         let mut buf = QpackData::default();
253         EncoderInstruction::InsertWithNameLiteral {
254             name: &name,
255             value: &value,
256         }
257         .marshal(&mut buf, self.use_huffman);
258 
259         let stream_id = self.local_stream.stream_id().ok_or(Error::Internal)?;
260 
261         let sent = conn
262             .stream_send_atomic(stream_id.as_u64(), &buf)
263             .map_err(|e| map_stream_send_atomic_error(&e))?;
264         if !sent {
265             return Err(Error::EncoderStreamBlocked);
266         }
267 
268         self.stats.dynamic_table_inserts += 1;
269 
270         match self.table.insert(name, value) {
271             Ok(inx) => Ok(inx),
272             Err(e) => {
273                 debug_assert!(false);
274                 Err(e)
275             }
276         }
277     }
278 
change_capacity(&mut self, value: u64)279     fn change_capacity(&mut self, value: u64) {
280         qdebug!([self], "change capacity: {}", value);
281         self.next_capacity = Some(value);
282     }
283 
maybe_send_change_capacity( &mut self, conn: &mut Connection, stream_id: StreamId, ) -> Res<()>284     fn maybe_send_change_capacity(
285         &mut self,
286         conn: &mut Connection,
287         stream_id: StreamId,
288     ) -> Res<()> {
289         if let Some(cap) = self.next_capacity {
290             // Check if it is possible to reduce the capacity, e.g. if enough space can be make free for the reduction.
291             if cap < self.table.capacity() && !self.table.can_evict_to(cap) {
292                 return Err(Error::DynamicTableFull);
293             }
294             let mut buf = QpackData::default();
295             EncoderInstruction::Capacity { value: cap }.marshal(&mut buf, self.use_huffman);
296             if !conn.stream_send_atomic(stream_id.as_u64(), &buf)? {
297                 return Err(Error::EncoderStreamBlocked);
298             }
299             if self.table.set_capacity(cap).is_err() {
300                 debug_assert!(
301                     false,
302                     "can_evict_to should have checked and make sure this operation is possible"
303                 );
304                 return Err(Error::InternalError(1));
305             }
306             self.max_entries = cap / 32;
307             self.next_capacity = None;
308         }
309         Ok(())
310     }
311 
312     /// Sends any qpack encoder instructions.
313     /// # Errors
314     ///   returns `EncoderStream` in case of an error.
send(&mut self, conn: &mut Connection) -> Res<()>315     pub fn send(&mut self, conn: &mut Connection) -> Res<()> {
316         match self.local_stream {
317             LocalStreamState::NoStream => {
318                 qerror!("Send call but there is no stream yet.");
319                 Ok(())
320             }
321             LocalStreamState::Uninitialized(stream_id) => {
322                 let mut buf = QpackData::default();
323                 buf.encode_varint(QPACK_UNI_STREAM_TYPE_ENCODER);
324                 if !conn.stream_send_atomic(stream_id.as_u64(), &buf[..])? {
325                     return Err(Error::EncoderStreamBlocked);
326                 }
327                 self.local_stream = LocalStreamState::Initialized(stream_id);
328                 self.maybe_send_change_capacity(conn, stream_id)
329             }
330             LocalStreamState::Initialized(stream_id) => {
331                 self.maybe_send_change_capacity(conn, stream_id)
332             }
333         }
334     }
335 
is_stream_blocker(&self, stream_id: u64) -> bool336     fn is_stream_blocker(&self, stream_id: u64) -> bool {
337         if let Some(hb_list) = self.unacked_header_blocks.get(&stream_id) {
338             debug_assert!(!hb_list.is_empty());
339             match hb_list.iter().flatten().max() {
340                 Some(max_ref) => *max_ref >= self.table.get_acked_inserts_cnt(),
341                 None => false,
342             }
343         } else {
344             false
345         }
346     }
347 
348     /// Encodes headers
349     /// # Errors
350     /// `ClosedCriticalStream` if the encoder stream is closed.
351     /// `InternalError` if an unexpected error occurred.
352     /// # Panics
353     /// If there is a programming error.
354     #[allow(
355         unknown_lints,
356         renamed_and_removed_lints,
357         clippy::unknown_clippy_lints,
358         clippy::unnested_or_patterns
359     )] // Until we require rust 1.53 we can't use or_patterns.
encode_header_block( &mut self, conn: &mut Connection, h: &[Header], stream_id: u64, ) -> Res<HeaderEncoder>360     pub fn encode_header_block(
361         &mut self,
362         conn: &mut Connection,
363         h: &[Header],
364         stream_id: u64,
365     ) -> Res<HeaderEncoder> {
366         qdebug!([self], "encoding headers.");
367 
368         let mut encoder_blocked = false;
369         // Try to send capacity instructions if present.
370         match self.send(conn) {
371             Ok(()) => {}
372             Err(Error::EncoderStreamBlocked) => {
373                 encoder_blocked = true;
374             }
375             Err(e) => {
376                 // `InternalError`, `ClosedCriticalStream`
377                 return Err(e);
378             }
379         }
380 
381         let mut encoded_h =
382             HeaderEncoder::new(self.table.base(), self.use_huffman, self.max_entries);
383 
384         let stream_is_blocker = self.is_stream_blocker(stream_id);
385         let can_block = self.blocked_stream_cnt < self.max_blocked_streams || stream_is_blocker;
386 
387         let mut ref_entries = HashSet::new();
388 
389         for iter in h.iter() {
390             let name = iter.name().as_bytes().to_vec();
391             let value = iter.value().as_bytes().to_vec();
392             qtrace!("encoding {:x?} {:x?}.", name, value);
393 
394             if let Some(LookupResult {
395                 index,
396                 static_table,
397                 value_matches,
398             }) = self.table.lookup(&name, &value, can_block)
399             {
400                 qtrace!(
401                     [self],
402                     "found a {} entry, value-match={}",
403                     if static_table { "static" } else { "dynamic" },
404                     value_matches
405                 );
406                 if value_matches {
407                     if static_table {
408                         encoded_h.encode_indexed_static(index);
409                     } else {
410                         encoded_h.encode_indexed_dynamic(index);
411                     }
412                 } else {
413                     encoded_h.encode_literal_with_name_ref(static_table, index, &value);
414                 }
415                 if !static_table && ref_entries.insert(index) {
416                     self.table.add_ref(index);
417                 }
418             } else if can_block & !encoder_blocked {
419                 // Insert using an InsertWithNameLiteral instruction. This entry name does not match any name in the
420                 // tables therefore we cannot use any other instruction.
421                 match self.send_and_insert(conn, &name, &value) {
422                     Ok(index) => {
423                         encoded_h.encode_indexed_dynamic(index);
424                         ref_entries.insert(index);
425                         self.table.add_ref(index);
426                     }
427                     Err(Error::EncoderStreamBlocked) | Err(Error::DynamicTableFull) => {
428                         // As soon as one of the instructions cannot be written or the table is full, do not try again.
429                         encoder_blocked = true;
430                         encoded_h.encode_literal_with_name_literal(&name, &value)
431                     }
432                     Err(e) => {
433                         // `InternalError`, `ClosedCriticalStream`
434                         return Err(e);
435                     }
436                 }
437             } else {
438                 encoded_h.encode_literal_with_name_literal(&name, &value);
439             }
440         }
441 
442         encoded_h.encode_header_block_prefix();
443 
444         if !stream_is_blocker {
445             // The streams was not a blocker, check if the stream is a blocker now.
446             if let Some(max_ref) = ref_entries.iter().max() {
447                 if *max_ref >= self.table.get_acked_inserts_cnt() {
448                     debug_assert!(self.blocked_stream_cnt <= self.max_blocked_streams);
449                     self.blocked_stream_cnt += 1;
450                 }
451             }
452         }
453 
454         if !ref_entries.is_empty() {
455             self.unacked_header_blocks
456                 .entry(stream_id)
457                 .or_insert_with(VecDeque::new)
458                 .push_front(ref_entries);
459             self.stats.dynamic_table_references += 1;
460         }
461         Ok(encoded_h)
462     }
463 
464     /// Encoder stream has been created. Add the stream id.
465     /// # Panics
466     /// If a stream has already been added.
add_send_stream(&mut self, stream_id: u64)467     pub fn add_send_stream(&mut self, stream_id: u64) {
468         if self.local_stream == LocalStreamState::NoStream {
469             self.local_stream = LocalStreamState::Uninitialized(StreamId::new(stream_id));
470         } else {
471             panic!("Adding multiple local streams");
472         }
473     }
474 
475     #[must_use]
stats(&self) -> Stats476     pub fn stats(&self) -> Stats {
477         self.stats.clone()
478     }
479 
480     #[must_use]
local_stream_id(&self) -> Option<u64>481     pub fn local_stream_id(&self) -> Option<u64> {
482         self.local_stream.stream_id().map(StreamId::as_u64)
483     }
484 
485     #[cfg(test)]
blocked_stream_cnt(&self) -> u16486     fn blocked_stream_cnt(&self) -> u16 {
487         self.blocked_stream_cnt
488     }
489 }
490 
491 impl ::std::fmt::Display for QPackEncoder {
fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result492     fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
493         write!(f, "QPackEncoder")
494     }
495 }
496 
map_error(err: &Error) -> Error497 fn map_error(err: &Error) -> Error {
498     if *err == Error::ClosedCriticalStream {
499         Error::ClosedCriticalStream
500     } else {
501         Error::DecoderStream
502     }
503 }
504 
map_stream_send_atomic_error(err: &TransportError) -> Error505 fn map_stream_send_atomic_error(err: &TransportError) -> Error {
506     match err {
507         TransportError::InvalidStreamId | TransportError::FinalSizeError => {
508             Error::ClosedCriticalStream
509         }
510         _ => {
511             debug_assert!(false, "Unexpected error");
512             Error::InternalError(2)
513         }
514     }
515 }
516 
517 #[cfg(test)]
518 mod tests {
519     use super::{Connection, Error, Header, QPackEncoder, Res};
520     use crate::QpackSettings;
521     use neqo_transport::{ConnectionParameters, StreamType};
522     use std::mem;
523     use test_fixture::{configure_server, default_client, default_server, handshake, now};
524 
525     struct TestEncoder {
526         encoder: QPackEncoder,
527         send_stream_id: u64,
528         recv_stream_id: u64,
529         conn: Connection,
530         peer_conn: Connection,
531     }
532 
533     impl TestEncoder {
change_capacity(&mut self, capacity: u64) -> Res<()>534         pub fn change_capacity(&mut self, capacity: u64) -> Res<()> {
535             self.encoder.set_max_capacity(capacity).unwrap();
536             // We will try to really change the table only when we send the change capacity instruction.
537             self.encoder.send(&mut self.conn)
538         }
539 
insert(&mut self, header: &[u8], value: &[u8], inst: &[u8])540         pub fn insert(&mut self, header: &[u8], value: &[u8], inst: &[u8]) {
541             let res = self.encoder.send_and_insert(&mut self.conn, header, value);
542             assert!(res.is_ok());
543             self.send_instructions(inst);
544         }
545 
encode_header_block( &mut self, stream_id: u64, headers: &[Header], expected_encoding: &[u8], inst: &[u8], )546         pub fn encode_header_block(
547             &mut self,
548             stream_id: u64,
549             headers: &[Header],
550             expected_encoding: &[u8],
551             inst: &[u8],
552         ) {
553             let buf = self
554                 .encoder
555                 .encode_header_block(&mut self.conn, headers, stream_id)
556                 .unwrap();
557             assert_eq!(&buf[..], expected_encoding);
558             self.send_instructions(inst);
559         }
560 
send_instructions(&mut self, encoder_instruction: &[u8])561         pub fn send_instructions(&mut self, encoder_instruction: &[u8]) {
562             self.encoder.send(&mut self.conn).unwrap();
563             let out = self.conn.process(None, now());
564             let out2 = self.peer_conn.process(out.dgram(), now());
565             mem::drop(self.conn.process(out2.dgram(), now()));
566             let mut buf = [0_u8; 100];
567             let (amount, fin) = self
568                 .peer_conn
569                 .stream_recv(self.send_stream_id, &mut buf)
570                 .unwrap();
571             assert!(!fin);
572             assert_eq!(buf[..amount], encoder_instruction[..]);
573         }
574     }
575 
connect_generic(huffman: bool, max_data: Option<u64>) -> TestEncoder576     fn connect_generic(huffman: bool, max_data: Option<u64>) -> TestEncoder {
577         let mut conn = default_client();
578         let mut peer_conn = max_data.map_or_else(default_server, |max| {
579             configure_server(
580                 ConnectionParameters::default()
581                     .max_stream_data(StreamType::UniDi, true, max)
582                     .max_stream_data(StreamType::BiDi, true, max)
583                     .max_stream_data(StreamType::BiDi, false, max),
584             )
585         });
586         handshake(&mut conn, &mut peer_conn);
587 
588         // create a stream
589         let recv_stream_id = peer_conn.stream_create(StreamType::UniDi).unwrap();
590         let send_stream_id = conn.stream_create(StreamType::UniDi).unwrap();
591 
592         // create an encoder
593         let mut encoder = QPackEncoder::new(
594             QpackSettings {
595                 max_table_size_encoder: 1500,
596                 max_table_size_decoder: 0,
597                 max_blocked_streams: 0,
598             },
599             huffman,
600         );
601         encoder.add_send_stream(send_stream_id);
602 
603         TestEncoder {
604             encoder,
605             send_stream_id,
606             recv_stream_id,
607             conn,
608             peer_conn,
609         }
610     }
611 
connect(huffman: bool) -> TestEncoder612     fn connect(huffman: bool) -> TestEncoder {
613         connect_generic(huffman, None)
614     }
615 
connect_flow_control(max_data: u64) -> TestEncoder616     fn connect_flow_control(max_data: u64) -> TestEncoder {
617         connect_generic(true, Some(max_data))
618     }
619 
recv_instruction(encoder: &mut TestEncoder, decoder_instruction: &[u8])620     fn recv_instruction(encoder: &mut TestEncoder, decoder_instruction: &[u8]) {
621         encoder
622             .peer_conn
623             .stream_send(encoder.recv_stream_id, decoder_instruction)
624             .unwrap();
625         let out = encoder.peer_conn.process(None, now());
626         mem::drop(encoder.conn.process(out.dgram(), now()));
627         assert!(encoder
628             .encoder
629             .read_instructions(&mut encoder.conn, encoder.recv_stream_id)
630             .is_ok());
631     }
632 
633     const CAP_INSTRUCTION_200: &[u8] = &[0x02, 0x3f, 0xa9, 0x01];
634     const CAP_INSTRUCTION_60: &[u8] = &[0x02, 0x3f, 0x1d];
635     const CAP_INSTRUCTION_1000: &[u8] = &[0x02, 0x3f, 0xc9, 0x07];
636     const CAP_INSTRUCTION_1500: &[u8] = &[0x02, 0x3f, 0xbd, 0x0b];
637 
638     const HEADER_CONTENT_LENGTH: &[u8] = &[
639         0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
640     ];
641     const VALUE_1: &[u8] = &[0x31, 0x32, 0x33, 0x34];
642     const VALUE_2: &[u8] = &[0x31, 0x32, 0x33, 0x34, 0x35];
643 
644     // HEADER_CONTENT_LENGTH and VALUE_1 encoded by instruction insert_with_name_literal.
645     const HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL: &[u8] = &[
646         0x4e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
647         0x04, 0x31, 0x32, 0x33, 0x34,
648     ];
649 
650     // HEADER_CONTENT_LENGTH and VALUE_2 encoded by instruction insert_with_name_literal.
651     const HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL: &[u8] = &[
652         0x4e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
653         0x05, 0x31, 0x32, 0x33, 0x34, 0x35,
654     ];
655 
656     // Indexed Header Field that refers to the first entry in the dynamic table.
657     const ENCODE_INDEXED_REF_DYNAMIC: &[u8] = &[0x02, 0x00, 0x80];
658 
659     const HEADER_ACK_STREAM_ID_1: &[u8] = &[0x81];
660     const HEADER_ACK_STREAM_ID_2: &[u8] = &[0x82];
661     const STREAM_CANCELED_ID_1: &[u8] = &[0x41];
662 
663     // test insert_with_name_literal which fails because there is not enough space in the table
664     #[test]
test_insert_with_name_literal_1()665     fn test_insert_with_name_literal_1() {
666         let mut encoder = connect(false);
667 
668         // insert "content-length: 1234
669         let res =
670             encoder
671                 .encoder
672                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
673         assert_eq!(Error::DynamicTableFull, res.unwrap_err());
674         encoder.send_instructions(&[0x02]);
675     }
676 
677     // test insert_with_name_literal - succeeds
678     #[test]
test_insert_with_name_literal_2()679     fn test_insert_with_name_literal_2() {
680         let mut encoder = connect(false);
681 
682         assert!(encoder.encoder.set_max_capacity(200).is_ok());
683         // test the change capacity instruction.
684         encoder.send_instructions(CAP_INSTRUCTION_200);
685 
686         // insert "content-length: 1234
687         let res =
688             encoder
689                 .encoder
690                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
691         assert!(res.is_ok());
692         encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
693     }
694 
695     #[test]
test_change_capacity()696     fn test_change_capacity() {
697         let mut encoder = connect(false);
698 
699         assert!(encoder.encoder.set_max_capacity(200).is_ok());
700         encoder.send_instructions(CAP_INSTRUCTION_200);
701     }
702 
703     struct TestElement {
704         pub headers: Vec<Header>,
705         pub header_block: &'static [u8],
706         pub encoder_inst: &'static [u8],
707     }
708 
709     #[test]
test_header_block_encoder_non()710     fn test_header_block_encoder_non() {
711         let test_cases: [TestElement; 6] = [
712             // test a header with ref to static - encode_indexed
713             TestElement {
714                 headers: vec![Header::new(":method", "GET")],
715                 header_block: &[0x00, 0x00, 0xd1],
716                 encoder_inst: &[],
717             },
718             // test encode_literal_with_name_ref
719             TestElement {
720                 headers: vec![Header::new(":path", "/somewhere")],
721                 header_block: &[
722                     0x00, 0x00, 0x51, 0x0a, 0x2f, 0x73, 0x6f, 0x6d, 0x65, 0x77, 0x68, 0x65, 0x72,
723                     0x65,
724                 ],
725                 encoder_inst: &[],
726             },
727             // test adding a new header and encode_post_base_index, also test fix_header_block_prefix
728             TestElement {
729                 headers: vec![Header::new("my-header", "my-value")],
730                 header_block: &[0x02, 0x80, 0x10],
731                 encoder_inst: &[
732                     0x49, 0x6d, 0x79, 0x2d, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x08, 0x6d, 0x79,
733                     0x2d, 0x76, 0x61, 0x6c, 0x75, 0x65,
734                 ],
735             },
736             // test encode_indexed with a ref to dynamic table.
737             TestElement {
738                 headers: vec![Header::new("my-header", "my-value")],
739                 header_block: ENCODE_INDEXED_REF_DYNAMIC,
740                 encoder_inst: &[],
741             },
742             // test encode_literal_with_name_ref.
743             TestElement {
744                 headers: vec![Header::new("my-header", "my-value2")],
745                 header_block: &[
746                     0x02, 0x00, 0x40, 0x09, 0x6d, 0x79, 0x2d, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32,
747                 ],
748                 encoder_inst: &[],
749             },
750             // test multiple headers
751             TestElement {
752                 headers: vec![
753                     Header::new(":method", "GET"),
754                     Header::new(":path", "/somewhere"),
755                     Header::new(":authority", "example.com"),
756                     Header::new(":scheme", "https"),
757                 ],
758                 header_block: &[
759                     0x00, 0x01, 0xd1, 0x51, 0x0a, 0x2f, 0x73, 0x6f, 0x6d, 0x65, 0x77, 0x68, 0x65,
760                     0x72, 0x65, 0x50, 0x0b, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x63,
761                     0x6f, 0x6d, 0xd7,
762                 ],
763                 encoder_inst: &[],
764             },
765         ];
766 
767         let mut encoder = connect(false);
768 
769         encoder.encoder.set_max_blocked_streams(100).unwrap();
770         encoder.encoder.set_max_capacity(200).unwrap();
771 
772         // test the change capacity instruction.
773         encoder.send_instructions(CAP_INSTRUCTION_200);
774 
775         for t in &test_cases {
776             let buf = encoder
777                 .encoder
778                 .encode_header_block(&mut encoder.conn, &t.headers, 1)
779                 .unwrap();
780             assert_eq!(&buf[..], t.header_block);
781             encoder.send_instructions(t.encoder_inst);
782         }
783     }
784 
785     #[test]
test_header_block_encoder_huffman()786     fn test_header_block_encoder_huffman() {
787         let test_cases: [TestElement; 6] = [
788             // test a header with ref to static - encode_indexed
789             TestElement {
790                 headers: vec![Header::new(":method", "GET")],
791                 header_block: &[0x00, 0x00, 0xd1],
792                 encoder_inst: &[],
793             },
794             // test encode_literal_with_name_ref
795             TestElement {
796                 headers: vec![Header::new(":path", "/somewhere")],
797                 header_block: &[
798                     0x00, 0x00, 0x51, 0x87, 0x61, 0x07, 0xa4, 0xbe, 0x27, 0x2d, 0x85,
799                 ],
800                 encoder_inst: &[],
801             },
802             // test adding a new header and encode_post_base_index, also test fix_header_block_prefix
803             TestElement {
804                 headers: vec![Header::new("my-header", "my-value")],
805                 header_block: &[0x02, 0x80, 0x10],
806                 encoder_inst: &[
807                     0x67, 0xa7, 0xd2, 0xd3, 0x94, 0x72, 0x16, 0xcf, 0x86, 0xa7, 0xd2, 0xdd, 0xc7,
808                     0x45, 0xa5,
809                 ],
810             },
811             // test encode_indexed with a ref to dynamic table.
812             TestElement {
813                 headers: vec![Header::new("my-header", "my-value")],
814                 header_block: ENCODE_INDEXED_REF_DYNAMIC,
815                 encoder_inst: &[],
816             },
817             // test encode_literal_with_name_ref.
818             TestElement {
819                 headers: vec![Header::new("my-header", "my-value2")],
820                 header_block: &[
821                     0x02, 0x00, 0x40, 0x87, 0xa7, 0xd2, 0xdd, 0xc7, 0x45, 0xa5, 0x17,
822                 ],
823                 encoder_inst: &[],
824             },
825             // test multiple headers
826             TestElement {
827                 headers: vec![
828                     Header::new(":method", "GET"),
829                     Header::new(":path", "/somewhere"),
830                     Header::new(":authority", "example.com"),
831                     Header::new(":scheme", "https"),
832                 ],
833                 header_block: &[
834                     0x00, 0x01, 0xd1, 0x51, 0x87, 0x61, 0x07, 0xa4, 0xbe, 0x27, 0x2d, 0x85, 0x50,
835                     0x88, 0x2f, 0x91, 0xd3, 0x5d, 0x05, 0x5c, 0x87, 0xa7, 0xd7,
836                 ],
837                 encoder_inst: &[],
838             },
839         ];
840 
841         let mut encoder = connect(true);
842 
843         encoder.encoder.set_max_blocked_streams(100).unwrap();
844         encoder.encoder.set_max_capacity(200).unwrap();
845 
846         // test the change capacity instruction.
847         encoder.send_instructions(CAP_INSTRUCTION_200);
848 
849         for t in &test_cases {
850             let buf = encoder
851                 .encoder
852                 .encode_header_block(&mut encoder.conn, &t.headers, 1)
853                 .unwrap();
854             assert_eq!(&buf[..], t.header_block);
855             encoder.send_instructions(t.encoder_inst);
856         }
857     }
858 
859     // Test inserts block on waiting for an insert count increment.
860     #[test]
test_insertion_blocked_on_insert_count_feedback()861     fn test_insertion_blocked_on_insert_count_feedback() {
862         let mut encoder = connect(false);
863 
864         encoder.encoder.set_max_capacity(60).unwrap();
865 
866         // test the change capacity instruction.
867         encoder.send_instructions(CAP_INSTRUCTION_60);
868 
869         // insert "content-length: 1234
870         let res =
871             encoder
872                 .encoder
873                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
874         assert!(res.is_ok());
875         encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
876 
877         // insert "content-length: 12345 which will fail because the ntry in the table cannot be evicted.
878         let res =
879             encoder
880                 .encoder
881                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
882         assert!(res.is_err());
883         encoder.send_instructions(&[]);
884 
885         // receive an insert count increment.
886         recv_instruction(&mut encoder, &[0x01]);
887 
888         // insert "content-length: 12345 again it will succeed.
889         let res =
890             encoder
891                 .encoder
892                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
893         assert!(res.is_ok());
894         encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
895     }
896 
897     // Test inserts block on waiting for acks
898     // test the table insertion is blocked:
899     // 0 - waiting for a header ack
900     // 2 - waiting for a stream cancel.
test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(wait: u8)901     fn test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(wait: u8) {
902         let mut encoder = connect(false);
903 
904         assert!(encoder.encoder.set_max_capacity(60).is_ok());
905         // test the change capacity instruction.
906         encoder.send_instructions(CAP_INSTRUCTION_60);
907 
908         // insert "content-length: 1234
909         let res =
910             encoder
911                 .encoder
912                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
913         assert!(res.is_ok());
914         encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
915 
916         // receive an insert count increment.
917         recv_instruction(&mut encoder, &[0x01]);
918 
919         // send a header block
920         let buf = encoder
921             .encoder
922             .encode_header_block(
923                 &mut encoder.conn,
924                 &[Header::new("content-length", "1234")],
925                 1,
926             )
927             .unwrap();
928         assert_eq!(&buf[..], ENCODE_INDEXED_REF_DYNAMIC);
929         encoder.send_instructions(&[]);
930 
931         // insert "content-length: 12345 which will fail because the entry in the table cannot be evicted
932         let res =
933             encoder
934                 .encoder
935                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
936         assert!(res.is_err());
937         encoder.send_instructions(&[]);
938 
939         if wait == 0 {
940             // receive a header_ack.
941             recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
942         } else {
943             // receive a stream canceled
944             recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
945         }
946 
947         // insert "content-length: 12345 again it will succeed.
948         let res =
949             encoder
950                 .encoder
951                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
952         assert!(res.is_ok());
953         encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
954     }
955 
956     #[test]
test_header_ack()957     fn test_header_ack() {
958         test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(0);
959     }
960 
961     #[test]
test_stream_canceled()962     fn test_stream_canceled() {
963         test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(1);
964     }
965 
assert_is_index_to_dynamic(buf: &[u8])966     fn assert_is_index_to_dynamic(buf: &[u8]) {
967         assert_eq!(buf[2] & 0xc0, 0x80);
968     }
969 
assert_is_index_to_dynamic_post(buf: &[u8])970     fn assert_is_index_to_dynamic_post(buf: &[u8]) {
971         assert_eq!(buf[2] & 0xf0, 0x10);
972     }
973 
assert_is_index_to_static_name_only(buf: &[u8])974     fn assert_is_index_to_static_name_only(buf: &[u8]) {
975         assert_eq!(buf[2] & 0xf0, 0x50);
976     }
977 
assert_is_literal_value_literal_name(buf: &[u8])978     fn assert_is_literal_value_literal_name(buf: &[u8]) {
979         assert_eq!(buf[2] & 0xf0, 0x20);
980     }
981 
982     #[test]
max_block_streams1()983     fn max_block_streams1() {
984         let mut encoder = connect(false);
985 
986         assert!(encoder.encoder.set_max_capacity(60).is_ok());
987 
988         // change capacity to 60.
989         encoder.send_instructions(CAP_INSTRUCTION_60);
990 
991         // insert "content-length: 1234
992         let res =
993             encoder
994                 .encoder
995                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
996 
997         assert!(res.is_ok());
998         encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
999 
1000         encoder.encoder.set_max_blocked_streams(1).unwrap();
1001 
1002         // send a header block, it refers to unacked entry.
1003         let buf = encoder
1004             .encoder
1005             .encode_header_block(
1006                 &mut encoder.conn,
1007                 &[Header::new("content-length", "1234")],
1008                 1,
1009             )
1010             .unwrap();
1011         assert_is_index_to_dynamic(&buf);
1012 
1013         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1014 
1015         encoder.send_instructions(&[]);
1016 
1017         // The next one will not use the dynamic entry because it is exceeding the max_blocked_streams
1018         // limit.
1019         let buf = encoder
1020             .encoder
1021             .encode_header_block(
1022                 &mut encoder.conn,
1023                 &[Header::new("content-length", "1234")],
1024                 2,
1025             )
1026             .unwrap();
1027         assert_is_index_to_static_name_only(&buf);
1028 
1029         encoder.send_instructions(&[]);
1030         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1031 
1032         // another header block to already blocked stream can still use the entry.
1033         let buf = encoder
1034             .encoder
1035             .encode_header_block(
1036                 &mut encoder.conn,
1037                 &[Header::new("content-length", "1234")],
1038                 1,
1039             )
1040             .unwrap();
1041         assert_is_index_to_dynamic(&buf);
1042 
1043         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1044     }
1045 
1046     #[test]
max_block_streams2()1047     fn max_block_streams2() {
1048         let mut encoder = connect(false);
1049 
1050         assert!(encoder.encoder.set_max_capacity(200).is_ok());
1051 
1052         // change capacity to 200.
1053         encoder.send_instructions(CAP_INSTRUCTION_200);
1054 
1055         // insert "content-length: 1234
1056         let res =
1057             encoder
1058                 .encoder
1059                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
1060 
1061         assert!(res.is_ok());
1062         encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
1063 
1064         // insert "content-length: 12345
1065         let res =
1066             encoder
1067                 .encoder
1068                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
1069 
1070         assert!(res.is_ok());
1071         encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
1072 
1073         encoder.encoder.set_max_blocked_streams(1).unwrap();
1074 
1075         let stream_id = 1;
1076         // send a header block, it refers to unacked entry.
1077         let buf = encoder
1078             .encoder
1079             .encode_header_block(
1080                 &mut encoder.conn,
1081                 &[Header::new("content-length", "1234")],
1082                 stream_id,
1083             )
1084             .unwrap();
1085         assert_is_index_to_dynamic(&buf);
1086 
1087         // encode another header block for the same stream that will refer to the second entry
1088         // in the dynamic table.
1089         // This should work because the stream is already a blocked stream
1090         // send a header block, it refers to unacked entry.
1091         let buf = encoder
1092             .encoder
1093             .encode_header_block(
1094                 &mut encoder.conn,
1095                 &[Header::new("content-length", "12345")],
1096                 stream_id,
1097             )
1098             .unwrap();
1099         assert_is_index_to_dynamic(&buf);
1100     }
1101 
1102     #[test]
max_block_streams3()1103     fn max_block_streams3() {
1104         let mut encoder = connect(false);
1105 
1106         assert!(encoder.encoder.set_max_capacity(200).is_ok());
1107 
1108         // change capacity to 200.
1109         encoder.send_instructions(CAP_INSTRUCTION_200);
1110 
1111         encoder.encoder.set_max_blocked_streams(1).unwrap();
1112 
1113         assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1114 
1115         // send a header block, that creates an new entry and refers to it.
1116         let buf = encoder
1117             .encoder
1118             .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1119             .unwrap();
1120         assert_is_index_to_dynamic_post(&buf);
1121 
1122         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1123 
1124         // The next one will not create a new entry because the encoder is on max_blocked_streams limit.
1125         let buf = encoder
1126             .encoder
1127             .encode_header_block(&mut encoder.conn, &[Header::new("name2", "value2")], 2)
1128             .unwrap();
1129         assert_is_literal_value_literal_name(&buf);
1130 
1131         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1132 
1133         // another header block to already blocked stream can still create a new entry.
1134         let buf = encoder
1135             .encoder
1136             .encode_header_block(&mut encoder.conn, &[Header::new("name2", "value2")], 1)
1137             .unwrap();
1138         assert_is_index_to_dynamic_post(&buf);
1139 
1140         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1141     }
1142 
1143     #[test]
max_block_streams4()1144     fn max_block_streams4() {
1145         let mut encoder = connect(false);
1146 
1147         assert!(encoder.encoder.set_max_capacity(200).is_ok());
1148 
1149         // change capacity to 200.
1150         encoder.send_instructions(CAP_INSTRUCTION_200);
1151 
1152         encoder.encoder.set_max_blocked_streams(1).unwrap();
1153 
1154         assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1155 
1156         // send a header block, that creates an new entry and refers to it.
1157         let buf = encoder
1158             .encoder
1159             .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1160             .unwrap();
1161         assert_is_index_to_dynamic_post(&buf);
1162 
1163         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1164 
1165         // another header block to already blocked stream can still create a new entry.
1166         let buf = encoder
1167             .encoder
1168             .encode_header_block(&mut encoder.conn, &[Header::new("name2", "value2")], 1)
1169             .unwrap();
1170         assert_is_index_to_dynamic_post(&buf);
1171 
1172         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1173 
1174         // receive a header_ack for the first header block.
1175         recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
1176 
1177         // The stream is still blocking because the second header block is not acked.
1178         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1179     }
1180 
1181     #[test]
max_block_streams5()1182     fn max_block_streams5() {
1183         let mut encoder = connect(false);
1184 
1185         assert!(encoder.encoder.set_max_capacity(200).is_ok());
1186 
1187         // change capacity to 200.
1188         encoder.send_instructions(CAP_INSTRUCTION_200);
1189 
1190         encoder.encoder.set_max_blocked_streams(1).unwrap();
1191 
1192         assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1193 
1194         // send a header block, that creates an new entry and refers to it.
1195         let buf = encoder
1196             .encoder
1197             .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1198             .unwrap();
1199         assert_is_index_to_dynamic_post(&buf);
1200 
1201         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1202 
1203         // another header block to already blocked stream can still create a new entry.
1204         let buf = encoder
1205             .encoder
1206             .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1207             .unwrap();
1208         assert_is_index_to_dynamic(&buf);
1209 
1210         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1211 
1212         // receive a header_ack for the first header block.
1213         recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
1214 
1215         // The stream is not blocking anymore because header ack also acks the instruction.
1216         assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1217     }
1218 
1219     #[test]
max_block_streams6()1220     fn max_block_streams6() {
1221         let mut encoder = connect(false);
1222 
1223         assert!(encoder.encoder.set_max_capacity(200).is_ok());
1224 
1225         // change capacity to 200.
1226         encoder.send_instructions(CAP_INSTRUCTION_200);
1227 
1228         encoder.encoder.set_max_blocked_streams(2).unwrap();
1229 
1230         assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1231 
1232         // send a header block, that creates an new entry and refers to it.
1233         let buf = encoder
1234             .encoder
1235             .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1236             .unwrap();
1237         assert_is_index_to_dynamic_post(&buf);
1238 
1239         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1240 
1241         // header block for the next stream will create an new entry as well.
1242         let buf = encoder
1243             .encoder
1244             .encode_header_block(&mut encoder.conn, &[Header::new("name2", "value2")], 2)
1245             .unwrap();
1246         assert_is_index_to_dynamic_post(&buf);
1247 
1248         assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
1249 
1250         // receive a header_ack for the second header block. This will ack the first as well
1251         recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_2);
1252 
1253         // The stream is not blocking anymore because header ack also acks the instruction.
1254         assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1255     }
1256 
1257     #[test]
max_block_streams7()1258     fn max_block_streams7() {
1259         let mut encoder = connect(false);
1260 
1261         assert!(encoder.encoder.set_max_capacity(200).is_ok());
1262 
1263         // change capacity to 200.
1264         encoder.send_instructions(CAP_INSTRUCTION_200);
1265 
1266         encoder.encoder.set_max_blocked_streams(2).unwrap();
1267 
1268         assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1269 
1270         // send a header block, that creates an new entry and refers to it.
1271         let buf = encoder
1272             .encoder
1273             .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1274             .unwrap();
1275         assert_is_index_to_dynamic_post(&buf);
1276 
1277         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1278 
1279         // header block for the next stream will create an new entry as well.
1280         let buf = encoder
1281             .encoder
1282             .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 2)
1283             .unwrap();
1284         assert_is_index_to_dynamic(&buf);
1285 
1286         assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
1287 
1288         // receive a stream cancel for the first stream.
1289         // This will remove the first stream as blocking but it will not mark the instruction as acked.
1290         // and the second steam will still be blocking.
1291         recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
1292 
1293         // The stream is not blocking anymore because header ack also acks the instruction.
1294         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1295     }
1296 
1297     #[test]
max_block_stream8()1298     fn max_block_stream8() {
1299         let mut encoder = connect(false);
1300 
1301         assert!(encoder.encoder.set_max_capacity(200).is_ok());
1302 
1303         // change capacity to 200.
1304         encoder.send_instructions(CAP_INSTRUCTION_200);
1305 
1306         encoder.encoder.set_max_blocked_streams(2).unwrap();
1307 
1308         assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1309 
1310         // send a header block, that creates an new entry and refers to it.
1311         let buf = encoder
1312             .encoder
1313             .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1314             .unwrap();
1315         assert_is_index_to_dynamic_post(&buf);
1316 
1317         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1318 
1319         // header block for the next stream will refer to the same entry.
1320         let buf = encoder
1321             .encoder
1322             .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 2)
1323             .unwrap();
1324         assert_is_index_to_dynamic(&buf);
1325 
1326         assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
1327 
1328         // send another header block on stream 1.
1329         let buf = encoder
1330             .encoder
1331             .encode_header_block(&mut encoder.conn, &[Header::new("name2", "value2")], 1)
1332             .unwrap();
1333         assert_is_index_to_dynamic_post(&buf);
1334 
1335         assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
1336 
1337         // stream 1 is block on entries 1 and 2; stream 2 is block only on 1.
1338         // receive an Insert Count Increment for the first entry.
1339         // After that only stream 1 will be blocking.
1340         recv_instruction(&mut encoder, &[0x01]);
1341 
1342         assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1343     }
1344 
1345     #[test]
dynamic_table_can_evict1()1346     fn dynamic_table_can_evict1() {
1347         let mut encoder = connect(false);
1348 
1349         assert!(encoder.encoder.set_max_capacity(60).is_ok());
1350 
1351         // change capacity to 60.
1352         encoder.send_instructions(CAP_INSTRUCTION_60);
1353 
1354         encoder.encoder.set_max_blocked_streams(2).unwrap();
1355 
1356         // insert "content-length: 1234
1357         let res =
1358             encoder
1359                 .encoder
1360                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
1361 
1362         assert!(res.is_ok());
1363         encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
1364 
1365         // send a header block, it refers to unacked entry.
1366         let buf = encoder
1367             .encoder
1368             .encode_header_block(
1369                 &mut encoder.conn,
1370                 &[Header::new("content-length", "1234")],
1371                 1,
1372             )
1373             .unwrap();
1374         assert_is_index_to_dynamic(&buf);
1375 
1376         // trying to evict the entry will failed.
1377         assert!(encoder.change_capacity(10).is_err());
1378 
1379         // receive an Insert Count Increment for the entry.
1380         recv_instruction(&mut encoder, &[0x01]);
1381 
1382         // trying to evict the entry will failed. The stream is still referring to it.
1383         assert!(encoder.change_capacity(10).is_err());
1384 
1385         // receive a header_ack for the header block.
1386         recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
1387 
1388         // now entry can be evicted.
1389         assert!(encoder.change_capacity(10).is_ok());
1390     }
1391 
1392     #[test]
dynamic_table_can_evict2()1393     fn dynamic_table_can_evict2() {
1394         let mut encoder = connect(false);
1395 
1396         assert!(encoder.encoder.set_max_capacity(60).is_ok());
1397 
1398         // change capacity to 60.
1399         encoder.send_instructions(CAP_INSTRUCTION_60);
1400 
1401         encoder.encoder.set_max_blocked_streams(2).unwrap();
1402 
1403         // insert "content-length: 1234
1404         let res =
1405             encoder
1406                 .encoder
1407                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
1408 
1409         assert!(res.is_ok());
1410         encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
1411 
1412         // send a header block, it refers to unacked entry.
1413         let buf = encoder
1414             .encoder
1415             .encode_header_block(
1416                 &mut encoder.conn,
1417                 &[Header::new("content-length", "1234")],
1418                 1,
1419             )
1420             .unwrap();
1421         assert_is_index_to_dynamic(&buf);
1422 
1423         // trying to evict the entry will failed.
1424         assert!(encoder.change_capacity(10).is_err());
1425 
1426         // receive an Insert Count Increment for the entry.
1427         recv_instruction(&mut encoder, &[0x01]);
1428 
1429         // trying to evict the entry will failed. The stream is still referring to it.
1430         assert!(encoder.change_capacity(10).is_err());
1431 
1432         // receive a stream cancelled.
1433         recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
1434 
1435         // now entry can be evicted.
1436         assert!(encoder.change_capacity(10).is_ok());
1437     }
1438 
1439     #[test]
dynamic_table_can_evict3()1440     fn dynamic_table_can_evict3() {
1441         let mut encoder = connect(false);
1442 
1443         assert!(encoder.encoder.set_max_capacity(60).is_ok());
1444 
1445         // change capacity to 60.
1446         encoder.send_instructions(CAP_INSTRUCTION_60);
1447 
1448         encoder.encoder.set_max_blocked_streams(2).unwrap();
1449 
1450         // insert "content-length: 1234
1451         let res =
1452             encoder
1453                 .encoder
1454                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
1455 
1456         assert!(res.is_ok());
1457         encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
1458 
1459         // trying to evict the entry will failed, because the entry is not acked.
1460         assert!(encoder.change_capacity(10).is_err());
1461 
1462         // receive an Insert Count Increment for the entry.
1463         recv_instruction(&mut encoder, &[0x01]);
1464 
1465         // now entry can be evicted.
1466         assert!(encoder.change_capacity(10).is_ok());
1467     }
1468 
1469     #[test]
dynamic_table_can_evict4()1470     fn dynamic_table_can_evict4() {
1471         let mut encoder = connect(false);
1472 
1473         assert!(encoder.encoder.set_max_capacity(60).is_ok());
1474 
1475         // change capacity to 60.
1476         encoder.send_instructions(CAP_INSTRUCTION_60);
1477 
1478         encoder.encoder.set_max_blocked_streams(2).unwrap();
1479 
1480         // insert "content-length: 1234
1481         let res =
1482             encoder
1483                 .encoder
1484                 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
1485 
1486         assert!(res.is_ok());
1487         encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
1488 
1489         // send a header block, it refers to unacked entry.
1490         let buf = encoder
1491             .encoder
1492             .encode_header_block(
1493                 &mut encoder.conn,
1494                 &[Header::new("content-length", "1234")],
1495                 1,
1496             )
1497             .unwrap();
1498         assert_is_index_to_dynamic(&buf);
1499 
1500         // trying to evict the entry will failed. The stream is still referring to it and
1501         // entry is not acked.
1502         assert!(encoder.change_capacity(10).is_err());
1503 
1504         // receive a header_ack for the header block. This will also ack the instruction.
1505         recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
1506 
1507         // now entry can be evicted.
1508         assert!(encoder.change_capacity(10).is_ok());
1509     }
1510 
1511     #[test]
encoder_flow_controlled_blocked()1512     fn encoder_flow_controlled_blocked() {
1513         const SMALL_MAX_DATA: u64 = 20;
1514         const ONE_INSTRUCTION_1: &[u8] = &[
1515             0x67, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x7f, 0x83, 0x8, 0x99, 0x6b,
1516         ];
1517         const ONE_INSTRUCTION_2: &[u8] = &[
1518             0x67, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x37, 0x83, 0x8, 0x99, 0x6b,
1519         ];
1520 
1521         let mut encoder = connect_flow_control(SMALL_MAX_DATA);
1522 
1523         // change capacity to 1000 and max_block streams to 20.
1524         encoder.encoder.set_max_blocked_streams(20).unwrap();
1525         assert!(encoder.encoder.set_max_capacity(1000).is_ok());
1526         encoder.send_instructions(CAP_INSTRUCTION_1000);
1527 
1528         // Encode a header block with 2 headers. The first header will be added to the dynamic table.
1529         // The second will not be added to the dynamic table, because the corresponding instruction
1530         // cannot be written immediately due to the flow control limit.
1531         let buf1 = encoder
1532             .encoder
1533             .encode_header_block(
1534                 &mut encoder.conn,
1535                 &[
1536                     Header::new("something", "1234"),
1537                     Header::new("something2", "12345678910"),
1538                 ],
1539                 1,
1540             )
1541             .unwrap();
1542 
1543         // Assert that the first header is encoded as an index to the dynamic table (a post form).
1544         assert_eq!(buf1[2], 0x10);
1545         // Assert that the second header is encoded as a literal with a name literal
1546         assert_eq!(buf1[3] & 0xf0, 0x20);
1547 
1548         // Try to encode another header block. Here both headers will be encoded as a literal with a name literal
1549         let buf2 = encoder
1550             .encoder
1551             .encode_header_block(
1552                 &mut encoder.conn,
1553                 &[
1554                     Header::new("something3", "1234"),
1555                     Header::new("something4", "12345678910"),
1556                 ],
1557                 2,
1558             )
1559             .unwrap();
1560         assert_eq!(buf2[2] & 0xf0, 0x20);
1561 
1562         // Ensure that we have sent only one instruction for (String::from("something", "1234"))
1563         encoder.send_instructions(ONE_INSTRUCTION_1);
1564 
1565         // exchange a flow control update.
1566         let out = encoder.peer_conn.process(None, now());
1567         mem::drop(encoder.conn.process(out.dgram(), now()));
1568 
1569         // Try writing a new header block. Now, headers will be added to the dynamic table again, because
1570         // instructions can be sent.
1571         let buf3 = encoder
1572             .encoder
1573             .encode_header_block(
1574                 &mut encoder.conn,
1575                 &[
1576                     Header::new("something5", "1234"),
1577                     Header::new("something6", "12345678910"),
1578                 ],
1579                 3,
1580             )
1581             .unwrap();
1582         // Assert that the first header is encoded as an index to the dynamic table (a post form).
1583         assert_eq!(buf3[2], 0x10);
1584         // Assert that the second header is encoded as a literal with a name literal
1585         assert_eq!(buf3[3] & 0xf0, 0x20);
1586 
1587         // Asset that one instruction has been sent
1588         encoder.send_instructions(ONE_INSTRUCTION_2);
1589     }
1590 
1591     #[test]
encoder_max_capacity_limit()1592     fn encoder_max_capacity_limit() {
1593         let mut encoder = connect(false);
1594 
1595         // change capacity to 2000.
1596         assert!(encoder.encoder.set_max_capacity(2000).is_ok());
1597         encoder.send_instructions(CAP_INSTRUCTION_1500);
1598     }
1599 
1600     #[test]
test_do_not_evict_entry_that_are_referd_only_by_the_same_header_blocked_encoding()1601     fn test_do_not_evict_entry_that_are_referd_only_by_the_same_header_blocked_encoding() {
1602         let mut encoder = connect(false);
1603 
1604         encoder.encoder.set_max_blocked_streams(20).unwrap();
1605         assert!(encoder.change_capacity(50).is_ok());
1606 
1607         encoder
1608             .encoder
1609             .send_and_insert(&mut encoder.conn, b"something5", b"1234")
1610             .unwrap();
1611 
1612         encoder.encoder.send(&mut encoder.conn).unwrap();
1613         let out = encoder.conn.process(None, now());
1614         mem::drop(encoder.peer_conn.process(out.dgram(), now()));
1615         // receive an insert count increment.
1616         recv_instruction(&mut encoder, &[0x01]);
1617 
1618         assert!(encoder
1619             .encoder
1620             .encode_header_block(
1621                 &mut encoder.conn,
1622                 &[
1623                     Header::new("something5", "1234"),
1624                     Header::new("something6", "1234"),
1625                 ],
1626                 3,
1627             )
1628             .is_ok());
1629     }
1630 
1631     #[test]
test_streams_cancel_cleans_up_unacked_header_blocks()1632     fn test_streams_cancel_cleans_up_unacked_header_blocks() {
1633         let mut encoder = connect(false);
1634 
1635         encoder.encoder.set_max_blocked_streams(10).unwrap();
1636         assert!(encoder.change_capacity(60).is_ok());
1637         encoder.send_instructions(CAP_INSTRUCTION_60);
1638 
1639         // insert "content-length: 1234
1640         encoder.insert(
1641             HEADER_CONTENT_LENGTH,
1642             VALUE_1,
1643             HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL,
1644         );
1645 
1646         // send a header block
1647         encoder.encode_header_block(
1648             1,
1649             &[Header::new("content-length", "1234")],
1650             ENCODE_INDEXED_REF_DYNAMIC,
1651             &[],
1652         );
1653 
1654         // receive a stream canceled instruction.
1655         recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
1656 
1657         recv_instruction(&mut encoder, &[0x01]);
1658     }
1659 }
1660