1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6
7 use crate::decoder_instructions::{DecoderInstruction, DecoderInstructionReader};
8 use crate::encoder_instructions::EncoderInstruction;
9 use crate::header_block::HeaderEncoder;
10 use crate::qlog;
11 use crate::qpack_send_buf::QpackData;
12 use crate::reader::ReceiverConnWrapper;
13 use crate::stats::Stats;
14 use crate::table::{HeaderTable, LookupResult, ADDITIONAL_TABLE_ENTRY_SIZE};
15 use crate::{Error, QpackSettings, Res};
16 use neqo_common::{qdebug, qerror, qlog::NeqoQlog, qtrace, Header};
17 use neqo_transport::{Connection, Error as TransportError, StreamId};
18 use std::collections::{HashMap, HashSet, VecDeque};
19 use std::convert::TryFrom;
20
21 pub const QPACK_UNI_STREAM_TYPE_ENCODER: u64 = 0x2;
22
23 #[derive(Debug, PartialEq)]
24 enum LocalStreamState {
25 NoStream,
26 Uninitialized(StreamId),
27 Initialized(StreamId),
28 }
29
30 impl LocalStreamState {
stream_id(&self) -> Option<StreamId>31 pub fn stream_id(&self) -> Option<StreamId> {
32 match self {
33 Self::NoStream => None,
34 Self::Uninitialized(stream_id) | Self::Initialized(stream_id) => Some(*stream_id),
35 }
36 }
37 }
38
39 #[derive(Debug)]
40 pub struct QPackEncoder {
41 table: HeaderTable,
42 max_table_size: u64,
43 max_entries: u64,
44 instruction_reader: DecoderInstructionReader,
45 local_stream: LocalStreamState,
46 max_blocked_streams: u16,
47 // Remember header blocks that are referring to dynamic table.
48 // There can be multiple header blocks in one stream, headers, trailer, push stream request, etc.
49 // This HashMap maps a stream ID to a list of header blocks. Each header block is a list of
50 // referenced dynamic table entries.
51 unacked_header_blocks: HashMap<u64, VecDeque<HashSet<u64>>>,
52 blocked_stream_cnt: u16,
53 use_huffman: bool,
54 next_capacity: Option<u64>,
55 stats: Stats,
56 }
57
58 impl QPackEncoder {
59 #[must_use]
new(qpack_settings: QpackSettings, use_huffman: bool) -> Self60 pub fn new(qpack_settings: QpackSettings, use_huffman: bool) -> Self {
61 Self {
62 table: HeaderTable::new(true),
63 max_table_size: qpack_settings.max_table_size_encoder,
64 max_entries: 0,
65 instruction_reader: DecoderInstructionReader::new(),
66 local_stream: LocalStreamState::NoStream,
67 max_blocked_streams: 0,
68 unacked_header_blocks: HashMap::new(),
69 blocked_stream_cnt: 0,
70 use_huffman,
71 next_capacity: None,
72 stats: Stats::default(),
73 }
74 }
75
76 /// This function is use for setting encoders table max capacity. The value is received as
77 /// a `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting parameter.
78 /// # Errors
79 /// `EncoderStream` if value is too big.
80 /// `ChangeCapacity` if table capacity cannot be reduced.
set_max_capacity(&mut self, cap: u64) -> Res<()>81 pub fn set_max_capacity(&mut self, cap: u64) -> Res<()> {
82 if cap > (1 << 30) - 1 {
83 return Err(Error::EncoderStream);
84 }
85
86 if cap == self.table.capacity() {
87 return Ok(());
88 }
89
90 qdebug!(
91 [self],
92 "Set max capacity to new capacity:{} old:{} max_table_size={}.",
93 cap,
94 self.table.capacity(),
95 self.max_table_size,
96 );
97
98 let new_cap = std::cmp::min(self.max_table_size, cap);
99 // we also set our table to the max allowed.
100 self.change_capacity(new_cap);
101 Ok(())
102 }
103
104 /// This function is use for setting encoders max blocked streams. The value is received as
105 /// a `SETTINGS_QPACK_BLOCKED_STREAMS` setting parameter.
106 /// # Errors
107 /// `EncoderStream` if value is too big.
set_max_blocked_streams(&mut self, blocked_streams: u64) -> Res<()>108 pub fn set_max_blocked_streams(&mut self, blocked_streams: u64) -> Res<()> {
109 self.max_blocked_streams = u16::try_from(blocked_streams).or(Err(Error::EncoderStream))?;
110 Ok(())
111 }
112
113 /// Reads decoder instructions.
114 /// # Errors
115 /// May return: `ClosedCriticalStream` if stream has been closed or `DecoderStream`
116 /// in case of any other transport error.
receive(&mut self, conn: &mut Connection, stream_id: u64) -> Res<()>117 pub fn receive(&mut self, conn: &mut Connection, stream_id: u64) -> Res<()> {
118 self.read_instructions(conn, stream_id)
119 .map_err(|e| map_error(&e))
120 }
121
read_instructions(&mut self, conn: &mut Connection, stream_id: u64) -> Res<()>122 fn read_instructions(&mut self, conn: &mut Connection, stream_id: u64) -> Res<()> {
123 qdebug!([self], "read a new instraction");
124 loop {
125 let mut recv = ReceiverConnWrapper::new(conn, stream_id);
126 match self.instruction_reader.read_instructions(&mut recv) {
127 Ok(instruction) => self.call_instruction(instruction, &mut conn.qlog_mut())?,
128 Err(Error::NeedMoreData) => break Ok(()),
129 Err(e) => break Err(e),
130 }
131 }
132 }
133
recalculate_blocked_streams(&mut self)134 fn recalculate_blocked_streams(&mut self) {
135 let acked_inserts_cnt = self.table.get_acked_inserts_cnt();
136 self.blocked_stream_cnt = 0;
137 for hb_list in self.unacked_header_blocks.values_mut() {
138 debug_assert!(!hb_list.is_empty());
139 if hb_list.iter().flatten().any(|e| *e >= acked_inserts_cnt) {
140 self.blocked_stream_cnt += 1;
141 }
142 }
143 }
144
145 #[allow(
146 clippy::map_err_ignore,
147 unknown_lints,
148 renamed_and_removed_lints,
149 clippy::unknown_clippy_lints
150 )]
insert_count_instruction(&mut self, increment: u64) -> Res<()>151 fn insert_count_instruction(&mut self, increment: u64) -> Res<()> {
152 self.table
153 .increment_acked(increment)
154 .map_err(|_| Error::DecoderStream)?;
155 self.recalculate_blocked_streams();
156 Ok(())
157 }
158
header_ack(&mut self, stream_id: u64)159 fn header_ack(&mut self, stream_id: u64) {
160 self.stats.header_acks_recv += 1;
161 let mut new_acked = self.table.get_acked_inserts_cnt();
162 if let Some(hb_list) = self.unacked_header_blocks.get_mut(&stream_id) {
163 if let Some(ref_list) = hb_list.pop_back() {
164 for iter in ref_list {
165 self.table.remove_ref(iter);
166 if iter >= new_acked {
167 new_acked = iter + 1;
168 }
169 }
170 } else {
171 debug_assert!(false, "We should have at least one header block.");
172 }
173 if hb_list.is_empty() {
174 self.unacked_header_blocks.remove(&stream_id);
175 }
176 }
177 if new_acked > self.table.get_acked_inserts_cnt() {
178 self.insert_count_instruction(new_acked - self.table.get_acked_inserts_cnt())
179 .expect("This should neve happen");
180 }
181 }
182
stream_cancellation(&mut self, stream_id: u64)183 fn stream_cancellation(&mut self, stream_id: u64) {
184 self.stats.stream_cancelled_recv += 1;
185 let mut was_blocker = false;
186 if let Some(mut hb_list) = self.unacked_header_blocks.remove(&stream_id) {
187 debug_assert!(!hb_list.is_empty());
188 while let Some(ref_list) = hb_list.pop_front() {
189 for iter in ref_list {
190 self.table.remove_ref(iter);
191 was_blocker = was_blocker || (iter >= self.table.get_acked_inserts_cnt());
192 }
193 }
194 }
195 if was_blocker {
196 debug_assert!(self.blocked_stream_cnt > 0);
197 self.blocked_stream_cnt -= 1;
198 }
199 }
200
call_instruction( &mut self, instruction: DecoderInstruction, qlog: &mut NeqoQlog, ) -> Res<()>201 fn call_instruction(
202 &mut self,
203 instruction: DecoderInstruction,
204 qlog: &mut NeqoQlog,
205 ) -> Res<()> {
206 qdebug!([self], "call intruction {:?}", instruction);
207 match instruction {
208 DecoderInstruction::InsertCountIncrement { increment } => {
209 qlog::qpack_read_insert_count_increment_instruction(
210 qlog,
211 increment,
212 &increment.to_be_bytes(),
213 );
214
215 self.insert_count_instruction(increment)
216 }
217 DecoderInstruction::HeaderAck { stream_id } => {
218 self.header_ack(stream_id);
219 Ok(())
220 }
221 DecoderInstruction::StreamCancellation { stream_id } => {
222 self.stream_cancellation(stream_id);
223 Ok(())
224 }
225 DecoderInstruction::NoInstruction => Ok(()),
226 }
227 }
228
229 /// Inserts a new entry into a table and sends the corresponding instruction to a peer. An entry is added only
230 /// if it is possible to send the corresponding instruction immediately, i.e. the encoder stream is not
231 /// blocked by the flow control (or stream internal buffer(this is very unlikely)).
232 /// ### Errors
233 /// `EncoderStreamBlocked` if the encoder stream is blocked by the flow control.
234 /// `DynamicTableFull` if the dynamic table does not have enough space for the entry.
235 /// The function can return transport errors: `InvalidStreamId`, `InvalidInput` and `FinalSizeError`.
236 /// # Panics
237 /// When the insertion fails (it should not).
send_and_insert( &mut self, conn: &mut Connection, name: &[u8], value: &[u8], ) -> Res<u64>238 pub fn send_and_insert(
239 &mut self,
240 conn: &mut Connection,
241 name: &[u8],
242 value: &[u8],
243 ) -> Res<u64> {
244 qdebug!([self], "insert {:?} {:?}.", name, value);
245
246 let entry_size = name.len() + value.len() + ADDITIONAL_TABLE_ENTRY_SIZE;
247
248 if !self.table.insert_possible(entry_size) {
249 return Err(Error::DynamicTableFull);
250 }
251
252 let mut buf = QpackData::default();
253 EncoderInstruction::InsertWithNameLiteral {
254 name: &name,
255 value: &value,
256 }
257 .marshal(&mut buf, self.use_huffman);
258
259 let stream_id = self.local_stream.stream_id().ok_or(Error::Internal)?;
260
261 let sent = conn
262 .stream_send_atomic(stream_id.as_u64(), &buf)
263 .map_err(|e| map_stream_send_atomic_error(&e))?;
264 if !sent {
265 return Err(Error::EncoderStreamBlocked);
266 }
267
268 self.stats.dynamic_table_inserts += 1;
269
270 match self.table.insert(name, value) {
271 Ok(inx) => Ok(inx),
272 Err(e) => {
273 debug_assert!(false);
274 Err(e)
275 }
276 }
277 }
278
change_capacity(&mut self, value: u64)279 fn change_capacity(&mut self, value: u64) {
280 qdebug!([self], "change capacity: {}", value);
281 self.next_capacity = Some(value);
282 }
283
maybe_send_change_capacity( &mut self, conn: &mut Connection, stream_id: StreamId, ) -> Res<()>284 fn maybe_send_change_capacity(
285 &mut self,
286 conn: &mut Connection,
287 stream_id: StreamId,
288 ) -> Res<()> {
289 if let Some(cap) = self.next_capacity {
290 // Check if it is possible to reduce the capacity, e.g. if enough space can be make free for the reduction.
291 if cap < self.table.capacity() && !self.table.can_evict_to(cap) {
292 return Err(Error::DynamicTableFull);
293 }
294 let mut buf = QpackData::default();
295 EncoderInstruction::Capacity { value: cap }.marshal(&mut buf, self.use_huffman);
296 if !conn.stream_send_atomic(stream_id.as_u64(), &buf)? {
297 return Err(Error::EncoderStreamBlocked);
298 }
299 if self.table.set_capacity(cap).is_err() {
300 debug_assert!(
301 false,
302 "can_evict_to should have checked and make sure this operation is possible"
303 );
304 return Err(Error::InternalError(1));
305 }
306 self.max_entries = cap / 32;
307 self.next_capacity = None;
308 }
309 Ok(())
310 }
311
312 /// Sends any qpack encoder instructions.
313 /// # Errors
314 /// returns `EncoderStream` in case of an error.
send(&mut self, conn: &mut Connection) -> Res<()>315 pub fn send(&mut self, conn: &mut Connection) -> Res<()> {
316 match self.local_stream {
317 LocalStreamState::NoStream => {
318 qerror!("Send call but there is no stream yet.");
319 Ok(())
320 }
321 LocalStreamState::Uninitialized(stream_id) => {
322 let mut buf = QpackData::default();
323 buf.encode_varint(QPACK_UNI_STREAM_TYPE_ENCODER);
324 if !conn.stream_send_atomic(stream_id.as_u64(), &buf[..])? {
325 return Err(Error::EncoderStreamBlocked);
326 }
327 self.local_stream = LocalStreamState::Initialized(stream_id);
328 self.maybe_send_change_capacity(conn, stream_id)
329 }
330 LocalStreamState::Initialized(stream_id) => {
331 self.maybe_send_change_capacity(conn, stream_id)
332 }
333 }
334 }
335
is_stream_blocker(&self, stream_id: u64) -> bool336 fn is_stream_blocker(&self, stream_id: u64) -> bool {
337 if let Some(hb_list) = self.unacked_header_blocks.get(&stream_id) {
338 debug_assert!(!hb_list.is_empty());
339 match hb_list.iter().flatten().max() {
340 Some(max_ref) => *max_ref >= self.table.get_acked_inserts_cnt(),
341 None => false,
342 }
343 } else {
344 false
345 }
346 }
347
348 /// Encodes headers
349 /// # Errors
350 /// `ClosedCriticalStream` if the encoder stream is closed.
351 /// `InternalError` if an unexpected error occurred.
352 /// # Panics
353 /// If there is a programming error.
354 #[allow(
355 unknown_lints,
356 renamed_and_removed_lints,
357 clippy::unknown_clippy_lints,
358 clippy::unnested_or_patterns
359 )] // Until we require rust 1.53 we can't use or_patterns.
encode_header_block( &mut self, conn: &mut Connection, h: &[Header], stream_id: u64, ) -> Res<HeaderEncoder>360 pub fn encode_header_block(
361 &mut self,
362 conn: &mut Connection,
363 h: &[Header],
364 stream_id: u64,
365 ) -> Res<HeaderEncoder> {
366 qdebug!([self], "encoding headers.");
367
368 let mut encoder_blocked = false;
369 // Try to send capacity instructions if present.
370 match self.send(conn) {
371 Ok(()) => {}
372 Err(Error::EncoderStreamBlocked) => {
373 encoder_blocked = true;
374 }
375 Err(e) => {
376 // `InternalError`, `ClosedCriticalStream`
377 return Err(e);
378 }
379 }
380
381 let mut encoded_h =
382 HeaderEncoder::new(self.table.base(), self.use_huffman, self.max_entries);
383
384 let stream_is_blocker = self.is_stream_blocker(stream_id);
385 let can_block = self.blocked_stream_cnt < self.max_blocked_streams || stream_is_blocker;
386
387 let mut ref_entries = HashSet::new();
388
389 for iter in h.iter() {
390 let name = iter.name().as_bytes().to_vec();
391 let value = iter.value().as_bytes().to_vec();
392 qtrace!("encoding {:x?} {:x?}.", name, value);
393
394 if let Some(LookupResult {
395 index,
396 static_table,
397 value_matches,
398 }) = self.table.lookup(&name, &value, can_block)
399 {
400 qtrace!(
401 [self],
402 "found a {} entry, value-match={}",
403 if static_table { "static" } else { "dynamic" },
404 value_matches
405 );
406 if value_matches {
407 if static_table {
408 encoded_h.encode_indexed_static(index);
409 } else {
410 encoded_h.encode_indexed_dynamic(index);
411 }
412 } else {
413 encoded_h.encode_literal_with_name_ref(static_table, index, &value);
414 }
415 if !static_table && ref_entries.insert(index) {
416 self.table.add_ref(index);
417 }
418 } else if can_block & !encoder_blocked {
419 // Insert using an InsertWithNameLiteral instruction. This entry name does not match any name in the
420 // tables therefore we cannot use any other instruction.
421 match self.send_and_insert(conn, &name, &value) {
422 Ok(index) => {
423 encoded_h.encode_indexed_dynamic(index);
424 ref_entries.insert(index);
425 self.table.add_ref(index);
426 }
427 Err(Error::EncoderStreamBlocked) | Err(Error::DynamicTableFull) => {
428 // As soon as one of the instructions cannot be written or the table is full, do not try again.
429 encoder_blocked = true;
430 encoded_h.encode_literal_with_name_literal(&name, &value)
431 }
432 Err(e) => {
433 // `InternalError`, `ClosedCriticalStream`
434 return Err(e);
435 }
436 }
437 } else {
438 encoded_h.encode_literal_with_name_literal(&name, &value);
439 }
440 }
441
442 encoded_h.encode_header_block_prefix();
443
444 if !stream_is_blocker {
445 // The streams was not a blocker, check if the stream is a blocker now.
446 if let Some(max_ref) = ref_entries.iter().max() {
447 if *max_ref >= self.table.get_acked_inserts_cnt() {
448 debug_assert!(self.blocked_stream_cnt <= self.max_blocked_streams);
449 self.blocked_stream_cnt += 1;
450 }
451 }
452 }
453
454 if !ref_entries.is_empty() {
455 self.unacked_header_blocks
456 .entry(stream_id)
457 .or_insert_with(VecDeque::new)
458 .push_front(ref_entries);
459 self.stats.dynamic_table_references += 1;
460 }
461 Ok(encoded_h)
462 }
463
464 /// Encoder stream has been created. Add the stream id.
465 /// # Panics
466 /// If a stream has already been added.
add_send_stream(&mut self, stream_id: u64)467 pub fn add_send_stream(&mut self, stream_id: u64) {
468 if self.local_stream == LocalStreamState::NoStream {
469 self.local_stream = LocalStreamState::Uninitialized(StreamId::new(stream_id));
470 } else {
471 panic!("Adding multiple local streams");
472 }
473 }
474
475 #[must_use]
stats(&self) -> Stats476 pub fn stats(&self) -> Stats {
477 self.stats.clone()
478 }
479
480 #[must_use]
local_stream_id(&self) -> Option<u64>481 pub fn local_stream_id(&self) -> Option<u64> {
482 self.local_stream.stream_id().map(StreamId::as_u64)
483 }
484
485 #[cfg(test)]
blocked_stream_cnt(&self) -> u16486 fn blocked_stream_cnt(&self) -> u16 {
487 self.blocked_stream_cnt
488 }
489 }
490
491 impl ::std::fmt::Display for QPackEncoder {
fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result492 fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
493 write!(f, "QPackEncoder")
494 }
495 }
496
map_error(err: &Error) -> Error497 fn map_error(err: &Error) -> Error {
498 if *err == Error::ClosedCriticalStream {
499 Error::ClosedCriticalStream
500 } else {
501 Error::DecoderStream
502 }
503 }
504
map_stream_send_atomic_error(err: &TransportError) -> Error505 fn map_stream_send_atomic_error(err: &TransportError) -> Error {
506 match err {
507 TransportError::InvalidStreamId | TransportError::FinalSizeError => {
508 Error::ClosedCriticalStream
509 }
510 _ => {
511 debug_assert!(false, "Unexpected error");
512 Error::InternalError(2)
513 }
514 }
515 }
516
517 #[cfg(test)]
518 mod tests {
519 use super::{Connection, Error, Header, QPackEncoder, Res};
520 use crate::QpackSettings;
521 use neqo_transport::{ConnectionParameters, StreamType};
522 use std::mem;
523 use test_fixture::{configure_server, default_client, default_server, handshake, now};
524
525 struct TestEncoder {
526 encoder: QPackEncoder,
527 send_stream_id: u64,
528 recv_stream_id: u64,
529 conn: Connection,
530 peer_conn: Connection,
531 }
532
533 impl TestEncoder {
change_capacity(&mut self, capacity: u64) -> Res<()>534 pub fn change_capacity(&mut self, capacity: u64) -> Res<()> {
535 self.encoder.set_max_capacity(capacity).unwrap();
536 // We will try to really change the table only when we send the change capacity instruction.
537 self.encoder.send(&mut self.conn)
538 }
539
insert(&mut self, header: &[u8], value: &[u8], inst: &[u8])540 pub fn insert(&mut self, header: &[u8], value: &[u8], inst: &[u8]) {
541 let res = self.encoder.send_and_insert(&mut self.conn, header, value);
542 assert!(res.is_ok());
543 self.send_instructions(inst);
544 }
545
encode_header_block( &mut self, stream_id: u64, headers: &[Header], expected_encoding: &[u8], inst: &[u8], )546 pub fn encode_header_block(
547 &mut self,
548 stream_id: u64,
549 headers: &[Header],
550 expected_encoding: &[u8],
551 inst: &[u8],
552 ) {
553 let buf = self
554 .encoder
555 .encode_header_block(&mut self.conn, headers, stream_id)
556 .unwrap();
557 assert_eq!(&buf[..], expected_encoding);
558 self.send_instructions(inst);
559 }
560
send_instructions(&mut self, encoder_instruction: &[u8])561 pub fn send_instructions(&mut self, encoder_instruction: &[u8]) {
562 self.encoder.send(&mut self.conn).unwrap();
563 let out = self.conn.process(None, now());
564 let out2 = self.peer_conn.process(out.dgram(), now());
565 mem::drop(self.conn.process(out2.dgram(), now()));
566 let mut buf = [0_u8; 100];
567 let (amount, fin) = self
568 .peer_conn
569 .stream_recv(self.send_stream_id, &mut buf)
570 .unwrap();
571 assert!(!fin);
572 assert_eq!(buf[..amount], encoder_instruction[..]);
573 }
574 }
575
connect_generic(huffman: bool, max_data: Option<u64>) -> TestEncoder576 fn connect_generic(huffman: bool, max_data: Option<u64>) -> TestEncoder {
577 let mut conn = default_client();
578 let mut peer_conn = max_data.map_or_else(default_server, |max| {
579 configure_server(
580 ConnectionParameters::default()
581 .max_stream_data(StreamType::UniDi, true, max)
582 .max_stream_data(StreamType::BiDi, true, max)
583 .max_stream_data(StreamType::BiDi, false, max),
584 )
585 });
586 handshake(&mut conn, &mut peer_conn);
587
588 // create a stream
589 let recv_stream_id = peer_conn.stream_create(StreamType::UniDi).unwrap();
590 let send_stream_id = conn.stream_create(StreamType::UniDi).unwrap();
591
592 // create an encoder
593 let mut encoder = QPackEncoder::new(
594 QpackSettings {
595 max_table_size_encoder: 1500,
596 max_table_size_decoder: 0,
597 max_blocked_streams: 0,
598 },
599 huffman,
600 );
601 encoder.add_send_stream(send_stream_id);
602
603 TestEncoder {
604 encoder,
605 send_stream_id,
606 recv_stream_id,
607 conn,
608 peer_conn,
609 }
610 }
611
connect(huffman: bool) -> TestEncoder612 fn connect(huffman: bool) -> TestEncoder {
613 connect_generic(huffman, None)
614 }
615
connect_flow_control(max_data: u64) -> TestEncoder616 fn connect_flow_control(max_data: u64) -> TestEncoder {
617 connect_generic(true, Some(max_data))
618 }
619
recv_instruction(encoder: &mut TestEncoder, decoder_instruction: &[u8])620 fn recv_instruction(encoder: &mut TestEncoder, decoder_instruction: &[u8]) {
621 encoder
622 .peer_conn
623 .stream_send(encoder.recv_stream_id, decoder_instruction)
624 .unwrap();
625 let out = encoder.peer_conn.process(None, now());
626 mem::drop(encoder.conn.process(out.dgram(), now()));
627 assert!(encoder
628 .encoder
629 .read_instructions(&mut encoder.conn, encoder.recv_stream_id)
630 .is_ok());
631 }
632
633 const CAP_INSTRUCTION_200: &[u8] = &[0x02, 0x3f, 0xa9, 0x01];
634 const CAP_INSTRUCTION_60: &[u8] = &[0x02, 0x3f, 0x1d];
635 const CAP_INSTRUCTION_1000: &[u8] = &[0x02, 0x3f, 0xc9, 0x07];
636 const CAP_INSTRUCTION_1500: &[u8] = &[0x02, 0x3f, 0xbd, 0x0b];
637
638 const HEADER_CONTENT_LENGTH: &[u8] = &[
639 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
640 ];
641 const VALUE_1: &[u8] = &[0x31, 0x32, 0x33, 0x34];
642 const VALUE_2: &[u8] = &[0x31, 0x32, 0x33, 0x34, 0x35];
643
644 // HEADER_CONTENT_LENGTH and VALUE_1 encoded by instruction insert_with_name_literal.
645 const HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL: &[u8] = &[
646 0x4e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
647 0x04, 0x31, 0x32, 0x33, 0x34,
648 ];
649
650 // HEADER_CONTENT_LENGTH and VALUE_2 encoded by instruction insert_with_name_literal.
651 const HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL: &[u8] = &[
652 0x4e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
653 0x05, 0x31, 0x32, 0x33, 0x34, 0x35,
654 ];
655
656 // Indexed Header Field that refers to the first entry in the dynamic table.
657 const ENCODE_INDEXED_REF_DYNAMIC: &[u8] = &[0x02, 0x00, 0x80];
658
659 const HEADER_ACK_STREAM_ID_1: &[u8] = &[0x81];
660 const HEADER_ACK_STREAM_ID_2: &[u8] = &[0x82];
661 const STREAM_CANCELED_ID_1: &[u8] = &[0x41];
662
663 // test insert_with_name_literal which fails because there is not enough space in the table
664 #[test]
test_insert_with_name_literal_1()665 fn test_insert_with_name_literal_1() {
666 let mut encoder = connect(false);
667
668 // insert "content-length: 1234
669 let res =
670 encoder
671 .encoder
672 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
673 assert_eq!(Error::DynamicTableFull, res.unwrap_err());
674 encoder.send_instructions(&[0x02]);
675 }
676
677 // test insert_with_name_literal - succeeds
678 #[test]
test_insert_with_name_literal_2()679 fn test_insert_with_name_literal_2() {
680 let mut encoder = connect(false);
681
682 assert!(encoder.encoder.set_max_capacity(200).is_ok());
683 // test the change capacity instruction.
684 encoder.send_instructions(CAP_INSTRUCTION_200);
685
686 // insert "content-length: 1234
687 let res =
688 encoder
689 .encoder
690 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
691 assert!(res.is_ok());
692 encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
693 }
694
695 #[test]
test_change_capacity()696 fn test_change_capacity() {
697 let mut encoder = connect(false);
698
699 assert!(encoder.encoder.set_max_capacity(200).is_ok());
700 encoder.send_instructions(CAP_INSTRUCTION_200);
701 }
702
703 struct TestElement {
704 pub headers: Vec<Header>,
705 pub header_block: &'static [u8],
706 pub encoder_inst: &'static [u8],
707 }
708
709 #[test]
test_header_block_encoder_non()710 fn test_header_block_encoder_non() {
711 let test_cases: [TestElement; 6] = [
712 // test a header with ref to static - encode_indexed
713 TestElement {
714 headers: vec![Header::new(":method", "GET")],
715 header_block: &[0x00, 0x00, 0xd1],
716 encoder_inst: &[],
717 },
718 // test encode_literal_with_name_ref
719 TestElement {
720 headers: vec![Header::new(":path", "/somewhere")],
721 header_block: &[
722 0x00, 0x00, 0x51, 0x0a, 0x2f, 0x73, 0x6f, 0x6d, 0x65, 0x77, 0x68, 0x65, 0x72,
723 0x65,
724 ],
725 encoder_inst: &[],
726 },
727 // test adding a new header and encode_post_base_index, also test fix_header_block_prefix
728 TestElement {
729 headers: vec![Header::new("my-header", "my-value")],
730 header_block: &[0x02, 0x80, 0x10],
731 encoder_inst: &[
732 0x49, 0x6d, 0x79, 0x2d, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x08, 0x6d, 0x79,
733 0x2d, 0x76, 0x61, 0x6c, 0x75, 0x65,
734 ],
735 },
736 // test encode_indexed with a ref to dynamic table.
737 TestElement {
738 headers: vec![Header::new("my-header", "my-value")],
739 header_block: ENCODE_INDEXED_REF_DYNAMIC,
740 encoder_inst: &[],
741 },
742 // test encode_literal_with_name_ref.
743 TestElement {
744 headers: vec![Header::new("my-header", "my-value2")],
745 header_block: &[
746 0x02, 0x00, 0x40, 0x09, 0x6d, 0x79, 0x2d, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32,
747 ],
748 encoder_inst: &[],
749 },
750 // test multiple headers
751 TestElement {
752 headers: vec![
753 Header::new(":method", "GET"),
754 Header::new(":path", "/somewhere"),
755 Header::new(":authority", "example.com"),
756 Header::new(":scheme", "https"),
757 ],
758 header_block: &[
759 0x00, 0x01, 0xd1, 0x51, 0x0a, 0x2f, 0x73, 0x6f, 0x6d, 0x65, 0x77, 0x68, 0x65,
760 0x72, 0x65, 0x50, 0x0b, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x63,
761 0x6f, 0x6d, 0xd7,
762 ],
763 encoder_inst: &[],
764 },
765 ];
766
767 let mut encoder = connect(false);
768
769 encoder.encoder.set_max_blocked_streams(100).unwrap();
770 encoder.encoder.set_max_capacity(200).unwrap();
771
772 // test the change capacity instruction.
773 encoder.send_instructions(CAP_INSTRUCTION_200);
774
775 for t in &test_cases {
776 let buf = encoder
777 .encoder
778 .encode_header_block(&mut encoder.conn, &t.headers, 1)
779 .unwrap();
780 assert_eq!(&buf[..], t.header_block);
781 encoder.send_instructions(t.encoder_inst);
782 }
783 }
784
785 #[test]
test_header_block_encoder_huffman()786 fn test_header_block_encoder_huffman() {
787 let test_cases: [TestElement; 6] = [
788 // test a header with ref to static - encode_indexed
789 TestElement {
790 headers: vec![Header::new(":method", "GET")],
791 header_block: &[0x00, 0x00, 0xd1],
792 encoder_inst: &[],
793 },
794 // test encode_literal_with_name_ref
795 TestElement {
796 headers: vec![Header::new(":path", "/somewhere")],
797 header_block: &[
798 0x00, 0x00, 0x51, 0x87, 0x61, 0x07, 0xa4, 0xbe, 0x27, 0x2d, 0x85,
799 ],
800 encoder_inst: &[],
801 },
802 // test adding a new header and encode_post_base_index, also test fix_header_block_prefix
803 TestElement {
804 headers: vec![Header::new("my-header", "my-value")],
805 header_block: &[0x02, 0x80, 0x10],
806 encoder_inst: &[
807 0x67, 0xa7, 0xd2, 0xd3, 0x94, 0x72, 0x16, 0xcf, 0x86, 0xa7, 0xd2, 0xdd, 0xc7,
808 0x45, 0xa5,
809 ],
810 },
811 // test encode_indexed with a ref to dynamic table.
812 TestElement {
813 headers: vec![Header::new("my-header", "my-value")],
814 header_block: ENCODE_INDEXED_REF_DYNAMIC,
815 encoder_inst: &[],
816 },
817 // test encode_literal_with_name_ref.
818 TestElement {
819 headers: vec![Header::new("my-header", "my-value2")],
820 header_block: &[
821 0x02, 0x00, 0x40, 0x87, 0xa7, 0xd2, 0xdd, 0xc7, 0x45, 0xa5, 0x17,
822 ],
823 encoder_inst: &[],
824 },
825 // test multiple headers
826 TestElement {
827 headers: vec![
828 Header::new(":method", "GET"),
829 Header::new(":path", "/somewhere"),
830 Header::new(":authority", "example.com"),
831 Header::new(":scheme", "https"),
832 ],
833 header_block: &[
834 0x00, 0x01, 0xd1, 0x51, 0x87, 0x61, 0x07, 0xa4, 0xbe, 0x27, 0x2d, 0x85, 0x50,
835 0x88, 0x2f, 0x91, 0xd3, 0x5d, 0x05, 0x5c, 0x87, 0xa7, 0xd7,
836 ],
837 encoder_inst: &[],
838 },
839 ];
840
841 let mut encoder = connect(true);
842
843 encoder.encoder.set_max_blocked_streams(100).unwrap();
844 encoder.encoder.set_max_capacity(200).unwrap();
845
846 // test the change capacity instruction.
847 encoder.send_instructions(CAP_INSTRUCTION_200);
848
849 for t in &test_cases {
850 let buf = encoder
851 .encoder
852 .encode_header_block(&mut encoder.conn, &t.headers, 1)
853 .unwrap();
854 assert_eq!(&buf[..], t.header_block);
855 encoder.send_instructions(t.encoder_inst);
856 }
857 }
858
859 // Test inserts block on waiting for an insert count increment.
860 #[test]
test_insertion_blocked_on_insert_count_feedback()861 fn test_insertion_blocked_on_insert_count_feedback() {
862 let mut encoder = connect(false);
863
864 encoder.encoder.set_max_capacity(60).unwrap();
865
866 // test the change capacity instruction.
867 encoder.send_instructions(CAP_INSTRUCTION_60);
868
869 // insert "content-length: 1234
870 let res =
871 encoder
872 .encoder
873 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
874 assert!(res.is_ok());
875 encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
876
877 // insert "content-length: 12345 which will fail because the ntry in the table cannot be evicted.
878 let res =
879 encoder
880 .encoder
881 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
882 assert!(res.is_err());
883 encoder.send_instructions(&[]);
884
885 // receive an insert count increment.
886 recv_instruction(&mut encoder, &[0x01]);
887
888 // insert "content-length: 12345 again it will succeed.
889 let res =
890 encoder
891 .encoder
892 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
893 assert!(res.is_ok());
894 encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
895 }
896
897 // Test inserts block on waiting for acks
898 // test the table insertion is blocked:
899 // 0 - waiting for a header ack
900 // 2 - waiting for a stream cancel.
test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(wait: u8)901 fn test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(wait: u8) {
902 let mut encoder = connect(false);
903
904 assert!(encoder.encoder.set_max_capacity(60).is_ok());
905 // test the change capacity instruction.
906 encoder.send_instructions(CAP_INSTRUCTION_60);
907
908 // insert "content-length: 1234
909 let res =
910 encoder
911 .encoder
912 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
913 assert!(res.is_ok());
914 encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
915
916 // receive an insert count increment.
917 recv_instruction(&mut encoder, &[0x01]);
918
919 // send a header block
920 let buf = encoder
921 .encoder
922 .encode_header_block(
923 &mut encoder.conn,
924 &[Header::new("content-length", "1234")],
925 1,
926 )
927 .unwrap();
928 assert_eq!(&buf[..], ENCODE_INDEXED_REF_DYNAMIC);
929 encoder.send_instructions(&[]);
930
931 // insert "content-length: 12345 which will fail because the entry in the table cannot be evicted
932 let res =
933 encoder
934 .encoder
935 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
936 assert!(res.is_err());
937 encoder.send_instructions(&[]);
938
939 if wait == 0 {
940 // receive a header_ack.
941 recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
942 } else {
943 // receive a stream canceled
944 recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
945 }
946
947 // insert "content-length: 12345 again it will succeed.
948 let res =
949 encoder
950 .encoder
951 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
952 assert!(res.is_ok());
953 encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
954 }
955
956 #[test]
test_header_ack()957 fn test_header_ack() {
958 test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(0);
959 }
960
961 #[test]
test_stream_canceled()962 fn test_stream_canceled() {
963 test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(1);
964 }
965
assert_is_index_to_dynamic(buf: &[u8])966 fn assert_is_index_to_dynamic(buf: &[u8]) {
967 assert_eq!(buf[2] & 0xc0, 0x80);
968 }
969
assert_is_index_to_dynamic_post(buf: &[u8])970 fn assert_is_index_to_dynamic_post(buf: &[u8]) {
971 assert_eq!(buf[2] & 0xf0, 0x10);
972 }
973
assert_is_index_to_static_name_only(buf: &[u8])974 fn assert_is_index_to_static_name_only(buf: &[u8]) {
975 assert_eq!(buf[2] & 0xf0, 0x50);
976 }
977
assert_is_literal_value_literal_name(buf: &[u8])978 fn assert_is_literal_value_literal_name(buf: &[u8]) {
979 assert_eq!(buf[2] & 0xf0, 0x20);
980 }
981
982 #[test]
max_block_streams1()983 fn max_block_streams1() {
984 let mut encoder = connect(false);
985
986 assert!(encoder.encoder.set_max_capacity(60).is_ok());
987
988 // change capacity to 60.
989 encoder.send_instructions(CAP_INSTRUCTION_60);
990
991 // insert "content-length: 1234
992 let res =
993 encoder
994 .encoder
995 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
996
997 assert!(res.is_ok());
998 encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
999
1000 encoder.encoder.set_max_blocked_streams(1).unwrap();
1001
1002 // send a header block, it refers to unacked entry.
1003 let buf = encoder
1004 .encoder
1005 .encode_header_block(
1006 &mut encoder.conn,
1007 &[Header::new("content-length", "1234")],
1008 1,
1009 )
1010 .unwrap();
1011 assert_is_index_to_dynamic(&buf);
1012
1013 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1014
1015 encoder.send_instructions(&[]);
1016
1017 // The next one will not use the dynamic entry because it is exceeding the max_blocked_streams
1018 // limit.
1019 let buf = encoder
1020 .encoder
1021 .encode_header_block(
1022 &mut encoder.conn,
1023 &[Header::new("content-length", "1234")],
1024 2,
1025 )
1026 .unwrap();
1027 assert_is_index_to_static_name_only(&buf);
1028
1029 encoder.send_instructions(&[]);
1030 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1031
1032 // another header block to already blocked stream can still use the entry.
1033 let buf = encoder
1034 .encoder
1035 .encode_header_block(
1036 &mut encoder.conn,
1037 &[Header::new("content-length", "1234")],
1038 1,
1039 )
1040 .unwrap();
1041 assert_is_index_to_dynamic(&buf);
1042
1043 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1044 }
1045
1046 #[test]
max_block_streams2()1047 fn max_block_streams2() {
1048 let mut encoder = connect(false);
1049
1050 assert!(encoder.encoder.set_max_capacity(200).is_ok());
1051
1052 // change capacity to 200.
1053 encoder.send_instructions(CAP_INSTRUCTION_200);
1054
1055 // insert "content-length: 1234
1056 let res =
1057 encoder
1058 .encoder
1059 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
1060
1061 assert!(res.is_ok());
1062 encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
1063
1064 // insert "content-length: 12345
1065 let res =
1066 encoder
1067 .encoder
1068 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
1069
1070 assert!(res.is_ok());
1071 encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
1072
1073 encoder.encoder.set_max_blocked_streams(1).unwrap();
1074
1075 let stream_id = 1;
1076 // send a header block, it refers to unacked entry.
1077 let buf = encoder
1078 .encoder
1079 .encode_header_block(
1080 &mut encoder.conn,
1081 &[Header::new("content-length", "1234")],
1082 stream_id,
1083 )
1084 .unwrap();
1085 assert_is_index_to_dynamic(&buf);
1086
1087 // encode another header block for the same stream that will refer to the second entry
1088 // in the dynamic table.
1089 // This should work because the stream is already a blocked stream
1090 // send a header block, it refers to unacked entry.
1091 let buf = encoder
1092 .encoder
1093 .encode_header_block(
1094 &mut encoder.conn,
1095 &[Header::new("content-length", "12345")],
1096 stream_id,
1097 )
1098 .unwrap();
1099 assert_is_index_to_dynamic(&buf);
1100 }
1101
1102 #[test]
max_block_streams3()1103 fn max_block_streams3() {
1104 let mut encoder = connect(false);
1105
1106 assert!(encoder.encoder.set_max_capacity(200).is_ok());
1107
1108 // change capacity to 200.
1109 encoder.send_instructions(CAP_INSTRUCTION_200);
1110
1111 encoder.encoder.set_max_blocked_streams(1).unwrap();
1112
1113 assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1114
1115 // send a header block, that creates an new entry and refers to it.
1116 let buf = encoder
1117 .encoder
1118 .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1119 .unwrap();
1120 assert_is_index_to_dynamic_post(&buf);
1121
1122 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1123
1124 // The next one will not create a new entry because the encoder is on max_blocked_streams limit.
1125 let buf = encoder
1126 .encoder
1127 .encode_header_block(&mut encoder.conn, &[Header::new("name2", "value2")], 2)
1128 .unwrap();
1129 assert_is_literal_value_literal_name(&buf);
1130
1131 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1132
1133 // another header block to already blocked stream can still create a new entry.
1134 let buf = encoder
1135 .encoder
1136 .encode_header_block(&mut encoder.conn, &[Header::new("name2", "value2")], 1)
1137 .unwrap();
1138 assert_is_index_to_dynamic_post(&buf);
1139
1140 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1141 }
1142
1143 #[test]
max_block_streams4()1144 fn max_block_streams4() {
1145 let mut encoder = connect(false);
1146
1147 assert!(encoder.encoder.set_max_capacity(200).is_ok());
1148
1149 // change capacity to 200.
1150 encoder.send_instructions(CAP_INSTRUCTION_200);
1151
1152 encoder.encoder.set_max_blocked_streams(1).unwrap();
1153
1154 assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1155
1156 // send a header block, that creates an new entry and refers to it.
1157 let buf = encoder
1158 .encoder
1159 .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1160 .unwrap();
1161 assert_is_index_to_dynamic_post(&buf);
1162
1163 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1164
1165 // another header block to already blocked stream can still create a new entry.
1166 let buf = encoder
1167 .encoder
1168 .encode_header_block(&mut encoder.conn, &[Header::new("name2", "value2")], 1)
1169 .unwrap();
1170 assert_is_index_to_dynamic_post(&buf);
1171
1172 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1173
1174 // receive a header_ack for the first header block.
1175 recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
1176
1177 // The stream is still blocking because the second header block is not acked.
1178 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1179 }
1180
1181 #[test]
max_block_streams5()1182 fn max_block_streams5() {
1183 let mut encoder = connect(false);
1184
1185 assert!(encoder.encoder.set_max_capacity(200).is_ok());
1186
1187 // change capacity to 200.
1188 encoder.send_instructions(CAP_INSTRUCTION_200);
1189
1190 encoder.encoder.set_max_blocked_streams(1).unwrap();
1191
1192 assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1193
1194 // send a header block, that creates an new entry and refers to it.
1195 let buf = encoder
1196 .encoder
1197 .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1198 .unwrap();
1199 assert_is_index_to_dynamic_post(&buf);
1200
1201 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1202
1203 // another header block to already blocked stream can still create a new entry.
1204 let buf = encoder
1205 .encoder
1206 .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1207 .unwrap();
1208 assert_is_index_to_dynamic(&buf);
1209
1210 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1211
1212 // receive a header_ack for the first header block.
1213 recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
1214
1215 // The stream is not blocking anymore because header ack also acks the instruction.
1216 assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1217 }
1218
1219 #[test]
max_block_streams6()1220 fn max_block_streams6() {
1221 let mut encoder = connect(false);
1222
1223 assert!(encoder.encoder.set_max_capacity(200).is_ok());
1224
1225 // change capacity to 200.
1226 encoder.send_instructions(CAP_INSTRUCTION_200);
1227
1228 encoder.encoder.set_max_blocked_streams(2).unwrap();
1229
1230 assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1231
1232 // send a header block, that creates an new entry and refers to it.
1233 let buf = encoder
1234 .encoder
1235 .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1236 .unwrap();
1237 assert_is_index_to_dynamic_post(&buf);
1238
1239 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1240
1241 // header block for the next stream will create an new entry as well.
1242 let buf = encoder
1243 .encoder
1244 .encode_header_block(&mut encoder.conn, &[Header::new("name2", "value2")], 2)
1245 .unwrap();
1246 assert_is_index_to_dynamic_post(&buf);
1247
1248 assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
1249
1250 // receive a header_ack for the second header block. This will ack the first as well
1251 recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_2);
1252
1253 // The stream is not blocking anymore because header ack also acks the instruction.
1254 assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1255 }
1256
1257 #[test]
max_block_streams7()1258 fn max_block_streams7() {
1259 let mut encoder = connect(false);
1260
1261 assert!(encoder.encoder.set_max_capacity(200).is_ok());
1262
1263 // change capacity to 200.
1264 encoder.send_instructions(CAP_INSTRUCTION_200);
1265
1266 encoder.encoder.set_max_blocked_streams(2).unwrap();
1267
1268 assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1269
1270 // send a header block, that creates an new entry and refers to it.
1271 let buf = encoder
1272 .encoder
1273 .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1274 .unwrap();
1275 assert_is_index_to_dynamic_post(&buf);
1276
1277 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1278
1279 // header block for the next stream will create an new entry as well.
1280 let buf = encoder
1281 .encoder
1282 .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 2)
1283 .unwrap();
1284 assert_is_index_to_dynamic(&buf);
1285
1286 assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
1287
1288 // receive a stream cancel for the first stream.
1289 // This will remove the first stream as blocking but it will not mark the instruction as acked.
1290 // and the second steam will still be blocking.
1291 recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
1292
1293 // The stream is not blocking anymore because header ack also acks the instruction.
1294 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1295 }
1296
1297 #[test]
max_block_stream8()1298 fn max_block_stream8() {
1299 let mut encoder = connect(false);
1300
1301 assert!(encoder.encoder.set_max_capacity(200).is_ok());
1302
1303 // change capacity to 200.
1304 encoder.send_instructions(CAP_INSTRUCTION_200);
1305
1306 encoder.encoder.set_max_blocked_streams(2).unwrap();
1307
1308 assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
1309
1310 // send a header block, that creates an new entry and refers to it.
1311 let buf = encoder
1312 .encoder
1313 .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 1)
1314 .unwrap();
1315 assert_is_index_to_dynamic_post(&buf);
1316
1317 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1318
1319 // header block for the next stream will refer to the same entry.
1320 let buf = encoder
1321 .encoder
1322 .encode_header_block(&mut encoder.conn, &[Header::new("name1", "value1")], 2)
1323 .unwrap();
1324 assert_is_index_to_dynamic(&buf);
1325
1326 assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
1327
1328 // send another header block on stream 1.
1329 let buf = encoder
1330 .encoder
1331 .encode_header_block(&mut encoder.conn, &[Header::new("name2", "value2")], 1)
1332 .unwrap();
1333 assert_is_index_to_dynamic_post(&buf);
1334
1335 assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
1336
1337 // stream 1 is block on entries 1 and 2; stream 2 is block only on 1.
1338 // receive an Insert Count Increment for the first entry.
1339 // After that only stream 1 will be blocking.
1340 recv_instruction(&mut encoder, &[0x01]);
1341
1342 assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
1343 }
1344
1345 #[test]
dynamic_table_can_evict1()1346 fn dynamic_table_can_evict1() {
1347 let mut encoder = connect(false);
1348
1349 assert!(encoder.encoder.set_max_capacity(60).is_ok());
1350
1351 // change capacity to 60.
1352 encoder.send_instructions(CAP_INSTRUCTION_60);
1353
1354 encoder.encoder.set_max_blocked_streams(2).unwrap();
1355
1356 // insert "content-length: 1234
1357 let res =
1358 encoder
1359 .encoder
1360 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
1361
1362 assert!(res.is_ok());
1363 encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
1364
1365 // send a header block, it refers to unacked entry.
1366 let buf = encoder
1367 .encoder
1368 .encode_header_block(
1369 &mut encoder.conn,
1370 &[Header::new("content-length", "1234")],
1371 1,
1372 )
1373 .unwrap();
1374 assert_is_index_to_dynamic(&buf);
1375
1376 // trying to evict the entry will failed.
1377 assert!(encoder.change_capacity(10).is_err());
1378
1379 // receive an Insert Count Increment for the entry.
1380 recv_instruction(&mut encoder, &[0x01]);
1381
1382 // trying to evict the entry will failed. The stream is still referring to it.
1383 assert!(encoder.change_capacity(10).is_err());
1384
1385 // receive a header_ack for the header block.
1386 recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
1387
1388 // now entry can be evicted.
1389 assert!(encoder.change_capacity(10).is_ok());
1390 }
1391
1392 #[test]
dynamic_table_can_evict2()1393 fn dynamic_table_can_evict2() {
1394 let mut encoder = connect(false);
1395
1396 assert!(encoder.encoder.set_max_capacity(60).is_ok());
1397
1398 // change capacity to 60.
1399 encoder.send_instructions(CAP_INSTRUCTION_60);
1400
1401 encoder.encoder.set_max_blocked_streams(2).unwrap();
1402
1403 // insert "content-length: 1234
1404 let res =
1405 encoder
1406 .encoder
1407 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
1408
1409 assert!(res.is_ok());
1410 encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
1411
1412 // send a header block, it refers to unacked entry.
1413 let buf = encoder
1414 .encoder
1415 .encode_header_block(
1416 &mut encoder.conn,
1417 &[Header::new("content-length", "1234")],
1418 1,
1419 )
1420 .unwrap();
1421 assert_is_index_to_dynamic(&buf);
1422
1423 // trying to evict the entry will failed.
1424 assert!(encoder.change_capacity(10).is_err());
1425
1426 // receive an Insert Count Increment for the entry.
1427 recv_instruction(&mut encoder, &[0x01]);
1428
1429 // trying to evict the entry will failed. The stream is still referring to it.
1430 assert!(encoder.change_capacity(10).is_err());
1431
1432 // receive a stream cancelled.
1433 recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
1434
1435 // now entry can be evicted.
1436 assert!(encoder.change_capacity(10).is_ok());
1437 }
1438
1439 #[test]
dynamic_table_can_evict3()1440 fn dynamic_table_can_evict3() {
1441 let mut encoder = connect(false);
1442
1443 assert!(encoder.encoder.set_max_capacity(60).is_ok());
1444
1445 // change capacity to 60.
1446 encoder.send_instructions(CAP_INSTRUCTION_60);
1447
1448 encoder.encoder.set_max_blocked_streams(2).unwrap();
1449
1450 // insert "content-length: 1234
1451 let res =
1452 encoder
1453 .encoder
1454 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
1455
1456 assert!(res.is_ok());
1457 encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
1458
1459 // trying to evict the entry will failed, because the entry is not acked.
1460 assert!(encoder.change_capacity(10).is_err());
1461
1462 // receive an Insert Count Increment for the entry.
1463 recv_instruction(&mut encoder, &[0x01]);
1464
1465 // now entry can be evicted.
1466 assert!(encoder.change_capacity(10).is_ok());
1467 }
1468
1469 #[test]
dynamic_table_can_evict4()1470 fn dynamic_table_can_evict4() {
1471 let mut encoder = connect(false);
1472
1473 assert!(encoder.encoder.set_max_capacity(60).is_ok());
1474
1475 // change capacity to 60.
1476 encoder.send_instructions(CAP_INSTRUCTION_60);
1477
1478 encoder.encoder.set_max_blocked_streams(2).unwrap();
1479
1480 // insert "content-length: 1234
1481 let res =
1482 encoder
1483 .encoder
1484 .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
1485
1486 assert!(res.is_ok());
1487 encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
1488
1489 // send a header block, it refers to unacked entry.
1490 let buf = encoder
1491 .encoder
1492 .encode_header_block(
1493 &mut encoder.conn,
1494 &[Header::new("content-length", "1234")],
1495 1,
1496 )
1497 .unwrap();
1498 assert_is_index_to_dynamic(&buf);
1499
1500 // trying to evict the entry will failed. The stream is still referring to it and
1501 // entry is not acked.
1502 assert!(encoder.change_capacity(10).is_err());
1503
1504 // receive a header_ack for the header block. This will also ack the instruction.
1505 recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
1506
1507 // now entry can be evicted.
1508 assert!(encoder.change_capacity(10).is_ok());
1509 }
1510
1511 #[test]
encoder_flow_controlled_blocked()1512 fn encoder_flow_controlled_blocked() {
1513 const SMALL_MAX_DATA: u64 = 20;
1514 const ONE_INSTRUCTION_1: &[u8] = &[
1515 0x67, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x7f, 0x83, 0x8, 0x99, 0x6b,
1516 ];
1517 const ONE_INSTRUCTION_2: &[u8] = &[
1518 0x67, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x37, 0x83, 0x8, 0x99, 0x6b,
1519 ];
1520
1521 let mut encoder = connect_flow_control(SMALL_MAX_DATA);
1522
1523 // change capacity to 1000 and max_block streams to 20.
1524 encoder.encoder.set_max_blocked_streams(20).unwrap();
1525 assert!(encoder.encoder.set_max_capacity(1000).is_ok());
1526 encoder.send_instructions(CAP_INSTRUCTION_1000);
1527
1528 // Encode a header block with 2 headers. The first header will be added to the dynamic table.
1529 // The second will not be added to the dynamic table, because the corresponding instruction
1530 // cannot be written immediately due to the flow control limit.
1531 let buf1 = encoder
1532 .encoder
1533 .encode_header_block(
1534 &mut encoder.conn,
1535 &[
1536 Header::new("something", "1234"),
1537 Header::new("something2", "12345678910"),
1538 ],
1539 1,
1540 )
1541 .unwrap();
1542
1543 // Assert that the first header is encoded as an index to the dynamic table (a post form).
1544 assert_eq!(buf1[2], 0x10);
1545 // Assert that the second header is encoded as a literal with a name literal
1546 assert_eq!(buf1[3] & 0xf0, 0x20);
1547
1548 // Try to encode another header block. Here both headers will be encoded as a literal with a name literal
1549 let buf2 = encoder
1550 .encoder
1551 .encode_header_block(
1552 &mut encoder.conn,
1553 &[
1554 Header::new("something3", "1234"),
1555 Header::new("something4", "12345678910"),
1556 ],
1557 2,
1558 )
1559 .unwrap();
1560 assert_eq!(buf2[2] & 0xf0, 0x20);
1561
1562 // Ensure that we have sent only one instruction for (String::from("something", "1234"))
1563 encoder.send_instructions(ONE_INSTRUCTION_1);
1564
1565 // exchange a flow control update.
1566 let out = encoder.peer_conn.process(None, now());
1567 mem::drop(encoder.conn.process(out.dgram(), now()));
1568
1569 // Try writing a new header block. Now, headers will be added to the dynamic table again, because
1570 // instructions can be sent.
1571 let buf3 = encoder
1572 .encoder
1573 .encode_header_block(
1574 &mut encoder.conn,
1575 &[
1576 Header::new("something5", "1234"),
1577 Header::new("something6", "12345678910"),
1578 ],
1579 3,
1580 )
1581 .unwrap();
1582 // Assert that the first header is encoded as an index to the dynamic table (a post form).
1583 assert_eq!(buf3[2], 0x10);
1584 // Assert that the second header is encoded as a literal with a name literal
1585 assert_eq!(buf3[3] & 0xf0, 0x20);
1586
1587 // Asset that one instruction has been sent
1588 encoder.send_instructions(ONE_INSTRUCTION_2);
1589 }
1590
1591 #[test]
encoder_max_capacity_limit()1592 fn encoder_max_capacity_limit() {
1593 let mut encoder = connect(false);
1594
1595 // change capacity to 2000.
1596 assert!(encoder.encoder.set_max_capacity(2000).is_ok());
1597 encoder.send_instructions(CAP_INSTRUCTION_1500);
1598 }
1599
1600 #[test]
test_do_not_evict_entry_that_are_referd_only_by_the_same_header_blocked_encoding()1601 fn test_do_not_evict_entry_that_are_referd_only_by_the_same_header_blocked_encoding() {
1602 let mut encoder = connect(false);
1603
1604 encoder.encoder.set_max_blocked_streams(20).unwrap();
1605 assert!(encoder.change_capacity(50).is_ok());
1606
1607 encoder
1608 .encoder
1609 .send_and_insert(&mut encoder.conn, b"something5", b"1234")
1610 .unwrap();
1611
1612 encoder.encoder.send(&mut encoder.conn).unwrap();
1613 let out = encoder.conn.process(None, now());
1614 mem::drop(encoder.peer_conn.process(out.dgram(), now()));
1615 // receive an insert count increment.
1616 recv_instruction(&mut encoder, &[0x01]);
1617
1618 assert!(encoder
1619 .encoder
1620 .encode_header_block(
1621 &mut encoder.conn,
1622 &[
1623 Header::new("something5", "1234"),
1624 Header::new("something6", "1234"),
1625 ],
1626 3,
1627 )
1628 .is_ok());
1629 }
1630
1631 #[test]
test_streams_cancel_cleans_up_unacked_header_blocks()1632 fn test_streams_cancel_cleans_up_unacked_header_blocks() {
1633 let mut encoder = connect(false);
1634
1635 encoder.encoder.set_max_blocked_streams(10).unwrap();
1636 assert!(encoder.change_capacity(60).is_ok());
1637 encoder.send_instructions(CAP_INSTRUCTION_60);
1638
1639 // insert "content-length: 1234
1640 encoder.insert(
1641 HEADER_CONTENT_LENGTH,
1642 VALUE_1,
1643 HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL,
1644 );
1645
1646 // send a header block
1647 encoder.encode_header_block(
1648 1,
1649 &[Header::new("content-length", "1234")],
1650 ENCODE_INDEXED_REF_DYNAMIC,
1651 &[],
1652 );
1653
1654 // receive a stream canceled instruction.
1655 recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
1656
1657 recv_instruction(&mut encoder, &[0x01]);
1658 }
1659 }
1660