1 use crate::codec::UserError;
2 use crate::frame::Reason;
3 use crate::proto::{self, WindowSize};
4 
5 use bytes::{Buf, Bytes};
6 use http::HeaderMap;
7 
8 use crate::PollExt;
9 use std::fmt;
10 #[cfg(feature = "stream")]
11 use std::pin::Pin;
12 use std::task::{Context, Poll};
13 
14 /// Sends the body stream and trailers to the remote peer.
15 ///
16 /// # Overview
17 ///
18 /// A `SendStream` is provided by [`SendRequest`] and [`SendResponse`] once the
19 /// HTTP/2.0 message header has been sent sent. It is used to stream the message
20 /// body and send the message trailers. See method level documentation for more
21 /// details.
22 ///
23 /// The `SendStream` instance is also used to manage outbound flow control.
24 ///
25 /// If a `SendStream` is dropped without explicitly closing the send stream, a
26 /// `RST_STREAM` frame will be sent. This essentially cancels the request /
27 /// response exchange.
28 ///
29 /// The ways to explicitly close the send stream are:
30 ///
31 /// * Set `end_of_stream` to true when calling [`send_request`],
32 ///   [`send_response`], or [`send_data`].
33 /// * Send trailers with [`send_trailers`].
34 /// * Explicitly reset the stream with [`send_reset`].
35 ///
36 /// # Flow control
37 ///
38 /// In HTTP/2.0, data cannot be sent to the remote peer unless there is
39 /// available window capacity on both the stream and the connection. When a data
40 /// frame is sent, both the stream window and the connection window are
41 /// decremented. When the stream level window reaches zero, no further data can
42 /// be sent on that stream. When the connection level window reaches zero, no
43 /// further data can be sent on any stream for that connection.
44 ///
45 /// When the remote peer is ready to receive more data, it sends `WINDOW_UPDATE`
46 /// frames. These frames increment the windows. See the [specification] for more
47 /// details on the principles of HTTP/2.0 flow control.
48 ///
49 /// The implications for sending data are that the caller **should** ensure that
50 /// both the stream and the connection has available window capacity before
51 /// loading the data to send into memory. The `SendStream` instance provides the
52 /// necessary APIs to perform this logic. This, however, is not an obligation.
53 /// If the caller attempts to send data on a stream when there is no available
54 /// window capacity, the library will buffer the data until capacity becomes
55 /// available, at which point the buffer will be flushed to the connection.
56 ///
57 /// **NOTE**: There is no bound on the amount of data that the library will
58 /// buffer. If you are sending large amounts of data, you really should hook
59 /// into the flow control lifecycle. Otherwise, you risk using up significant
60 /// amounts of memory.
61 ///
62 /// To hook into the flow control lifecycle, the caller signals to the library
63 /// that it intends to send data by calling [`reserve_capacity`], specifying the
64 /// amount of data, in octets, that the caller intends to send. After this,
65 /// `poll_capacity` is used to be notified when the requested capacity is
66 /// assigned to the stream. Once [`poll_capacity`] returns `Ready` with the number
67 /// of octets available to the stream, the caller is able to actually send the
68 /// data using [`send_data`].
69 ///
70 /// Because there is also a connection level window that applies to **all**
71 /// streams on a connection, when capacity is assigned to a stream (indicated by
72 /// `poll_capacity` returning `Ready`), this capacity is reserved on the
73 /// connection and will **not** be assigned to any other stream. If data is
74 /// never written to the stream, that capacity is effectively lost to other
75 /// streams and this introduces the risk of deadlocking a connection.
76 ///
77 /// To avoid throttling data on a connection, the caller should not reserve
78 /// capacity until ready to send data and once any capacity is assigned to the
79 /// stream, the caller should immediately send data consuming this capacity.
80 /// There is no guarantee as to when the full capacity requested will become
81 /// available. For example, if the caller requests 64 KB of data and 512 bytes
82 /// become available, the caller should immediately send 512 bytes of data.
83 ///
84 /// See [`reserve_capacity`] documentation for more details.
85 ///
86 /// [`SendRequest`]: client/struct.SendRequest.html
87 /// [`SendResponse`]: server/struct.SendResponse.html
88 /// [specification]: http://httpwg.org/specs/rfc7540.html#FlowControl
89 /// [`reserve_capacity`]: #method.reserve_capacity
90 /// [`poll_capacity`]: #method.poll_capacity
91 /// [`send_data`]: #method.send_data
92 /// [`send_request`]: client/struct.SendRequest.html#method.send_request
93 /// [`send_response`]: server/struct.SendResponse.html#method.send_response
94 /// [`send_data`]: #method.send_data
95 /// [`send_trailers`]: #method.send_trailers
96 /// [`send_reset`]: #method.send_reset
97 #[derive(Debug)]
98 pub struct SendStream<B: Buf> {
99     inner: proto::StreamRef<B>,
100 }
101 
102 /// A stream identifier, as described in [Section 5.1.1] of RFC 7540.
103 ///
104 /// Streams are identified with an unsigned 31-bit integer. Streams
105 /// initiated by a client MUST use odd-numbered stream identifiers; those
106 /// initiated by the server MUST use even-numbered stream identifiers.  A
107 /// stream identifier of zero (0x0) is used for connection control
108 /// messages; the stream identifier of zero cannot be used to establish a
109 /// new stream.
110 ///
111 /// [Section 5.1.1]: https://tools.ietf.org/html/rfc7540#section-5.1.1
112 #[derive(Debug, Clone, Eq, PartialEq, Hash)]
113 pub struct StreamId(u32);
114 
115 /// Receives the body stream and trailers from the remote peer.
116 ///
117 /// A `RecvStream` is provided by [`client::ResponseFuture`] and
118 /// [`server::Connection`] with the received HTTP/2.0 message head (the response
119 /// and request head respectively).
120 ///
121 /// A `RecvStream` instance is used to receive the streaming message body and
122 /// any trailers from the remote peer. It is also used to manage inbound flow
123 /// control.
124 ///
125 /// See method level documentation for more details on receiving data. See
126 /// [`FlowControl`] for more details on inbound flow control.
127 ///
128 /// [`client::ResponseFuture`]: client/struct.ResponseFuture.html
129 /// [`server::Connection`]: server/struct.Connection.html
130 /// [`FlowControl`]: struct.FlowControl.html
131 /// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
132 #[must_use = "streams do nothing unless polled"]
133 pub struct RecvStream {
134     inner: FlowControl,
135 }
136 
137 /// A handle to release window capacity to a remote stream.
138 ///
139 /// This type allows the caller to manage inbound data [flow control]. The
140 /// caller is expected to call [`release_capacity`] after dropping data frames.
141 ///
142 /// # Overview
143 ///
144 /// Each stream has a window size. This window size is the maximum amount of
145 /// inbound data that can be in-flight. In-flight data is defined as data that
146 /// has been received, but not yet released.
147 ///
148 /// When a stream is created, the window size is set to the connection's initial
149 /// window size value. When a data frame is received, the window size is then
150 /// decremented by size of the data frame before the data is provided to the
151 /// caller. As the caller finishes using the data, [`release_capacity`] must be
152 /// called. This will then increment the window size again, allowing the peer to
153 /// send more data.
154 ///
155 /// There is also a connection level window as well as the stream level window.
156 /// Received data counts against the connection level window as well and calls
157 /// to [`release_capacity`] will also increment the connection level window.
158 ///
159 /// # Sending `WINDOW_UPDATE` frames
160 ///
161 /// `WINDOW_UPDATE` frames will not be sent out for **every** call to
162 /// `release_capacity`, as this would end up slowing down the protocol. Instead,
163 /// `h2` waits until the window size is increased to a certain threshold and
164 /// then sends out a single `WINDOW_UPDATE` frame representing all the calls to
165 /// `release_capacity` since the last `WINDOW_UPDATE` frame.
166 ///
167 /// This essentially batches window updating.
168 ///
169 /// # Scenarios
170 ///
171 /// Following is a basic scenario with an HTTP/2.0 connection containing a
172 /// single active stream.
173 ///
174 /// * A new stream is activated. The receive window is initialized to 1024 (the
175 ///   value of the initial window size for this connection).
176 /// * A `DATA` frame is received containing a payload of 600 bytes.
177 /// * The receive window size is reduced to 424 bytes.
178 /// * [`release_capacity`] is called with 200.
179 /// * The receive window size is now 624 bytes. The peer may send no more than
180 ///   this.
181 /// * A `DATA` frame is received with a payload of 624 bytes.
182 /// * The window size is now 0 bytes. The peer may not send any more data.
183 /// * [`release_capacity`] is called with 1024.
184 /// * The receive window size is now 1024 bytes. The peer may now send more
185 /// data.
186 ///
187 /// [flow control]: ../index.html#flow-control
188 /// [`release_capacity`]: struct.FlowControl.html#method.release_capacity
189 #[derive(Clone, Debug)]
190 pub struct FlowControl {
191     inner: proto::OpaqueStreamRef,
192 }
193 
194 /// A handle to send and receive PING frames with the peer.
195 // NOT Clone on purpose
196 pub struct PingPong {
197     inner: proto::UserPings,
198 }
199 
200 /// Sent via [`PingPong`][] to send a PING frame to a peer.
201 ///
202 /// [`PingPong`]: struct.PingPong.html
203 pub struct Ping {
204     _p: (),
205 }
206 
207 /// Received via [`PingPong`][] when a peer acknowledges a [`Ping`][].
208 ///
209 /// [`PingPong`]: struct.PingPong.html
210 /// [`Ping`]: struct.Ping.html
211 pub struct Pong {
212     _p: (),
213 }
214 
215 // ===== impl SendStream =====
216 
217 impl<B: Buf> SendStream<B> {
new(inner: proto::StreamRef<B>) -> Self218     pub(crate) fn new(inner: proto::StreamRef<B>) -> Self {
219         SendStream { inner }
220     }
221 
222     /// Requests capacity to send data.
223     ///
224     /// This function is used to express intent to send data. This requests
225     /// connection level capacity. Once the capacity is available, it is
226     /// assigned to the stream and not reused by other streams.
227     ///
228     /// This function may be called repeatedly. The `capacity` argument is the
229     /// **total** amount of requested capacity. Sequential calls to
230     /// `reserve_capacity` are *not* additive. Given the following:
231     ///
232     /// ```rust
233     /// # use h2::*;
234     /// # fn doc(mut send_stream: SendStream<&'static [u8]>) {
235     /// send_stream.reserve_capacity(100);
236     /// send_stream.reserve_capacity(200);
237     /// # }
238     /// ```
239     ///
240     /// After the second call to `reserve_capacity`, the *total* requested
241     /// capacity will be 200.
242     ///
243     /// `reserve_capacity` is also used to cancel previous capacity requests.
244     /// Given the following:
245     ///
246     /// ```rust
247     /// # use h2::*;
248     /// # fn doc(mut send_stream: SendStream<&'static [u8]>) {
249     /// send_stream.reserve_capacity(100);
250     /// send_stream.reserve_capacity(0);
251     /// # }
252     /// ```
253     ///
254     /// After the second call to `reserve_capacity`, the *total* requested
255     /// capacity will be 0, i.e. there is no requested capacity for the stream.
256     ///
257     /// If `reserve_capacity` is called with a lower value than the amount of
258     /// capacity **currently** assigned to the stream, this capacity will be
259     /// returned to the connection to be re-assigned to other streams.
260     ///
261     /// Also, the amount of capacity that is reserved gets decremented as data
262     /// is sent. For example:
263     ///
264     /// ```rust
265     /// # use h2::*;
266     /// # async fn doc(mut send_stream: SendStream<&'static [u8]>) {
267     /// send_stream.reserve_capacity(100);
268     ///
269     /// send_stream.send_data(b"hello", false).unwrap();
270     /// // At this point, the total amount of requested capacity is 95 bytes.
271     ///
272     /// // Calling `reserve_capacity` with `100` again essentially requests an
273     /// // additional 5 bytes.
274     /// send_stream.reserve_capacity(100);
275     /// # }
276     /// ```
277     ///
278     /// See [Flow control](struct.SendStream.html#flow-control) for an overview
279     /// of how send flow control works.
reserve_capacity(&mut self, capacity: usize)280     pub fn reserve_capacity(&mut self, capacity: usize) {
281         // TODO: Check for overflow
282         self.inner.reserve_capacity(capacity as WindowSize)
283     }
284 
285     /// Returns the stream's current send capacity.
286     ///
287     /// This allows the caller to check the current amount of available capacity
288     /// before sending data.
capacity(&self) -> usize289     pub fn capacity(&self) -> usize {
290         self.inner.capacity() as usize
291     }
292 
293     /// Requests to be notified when the stream's capacity increases.
294     ///
295     /// Before calling this, capacity should be requested with
296     /// `reserve_capacity`. Once capacity is requested, the connection will
297     /// assign capacity to the stream **as it becomes available**. There is no
298     /// guarantee as to when and in what increments capacity gets assigned to
299     /// the stream.
300     ///
301     /// To get notified when the available capacity increases, the caller calls
302     /// `poll_capacity`, which returns `Ready(Some(n))` when `n` has been
303     /// increased by the connection. Note that `n` here represents the **total**
304     /// amount of assigned capacity at that point in time. It is also possible
305     /// that `n` is lower than the previous call if, since then, the caller has
306     /// sent data.
poll_capacity(&mut self, cx: &mut Context) -> Poll<Option<Result<usize, crate::Error>>>307     pub fn poll_capacity(&mut self, cx: &mut Context) -> Poll<Option<Result<usize, crate::Error>>> {
308         self.inner
309             .poll_capacity(cx)
310             .map_ok_(|w| w as usize)
311             .map_err_(Into::into)
312     }
313 
314     /// Sends a single data frame to the remote peer.
315     ///
316     /// This function may be called repeatedly as long as `end_of_stream` is set
317     /// to `false`. Setting `end_of_stream` to `true` sets the end stream flag
318     /// on the data frame. Any further calls to `send_data` or `send_trailers`
319     /// will return an [`Error`].
320     ///
321     /// `send_data` can be called without reserving capacity. In this case, the
322     /// data is buffered and the capacity is implicitly requested. Once the
323     /// capacity becomes available, the data is flushed to the connection.
324     /// However, this buffering is unbounded. As such, sending large amounts of
325     /// data without reserving capacity before hand could result in large
326     /// amounts of data being buffered in memory.
327     ///
328     /// [`Error`]: struct.Error.html
send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), crate::Error>329     pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), crate::Error> {
330         self.inner
331             .send_data(data, end_of_stream)
332             .map_err(Into::into)
333     }
334 
335     /// Sends trailers to the remote peer.
336     ///
337     /// Sending trailers implicitly closes the send stream. Once the send stream
338     /// is closed, no more data can be sent.
send_trailers(&mut self, trailers: HeaderMap) -> Result<(), crate::Error>339     pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), crate::Error> {
340         self.inner.send_trailers(trailers).map_err(Into::into)
341     }
342 
343     /// Resets the stream.
344     ///
345     /// This cancels the request / response exchange. If the response has not
346     /// yet been received, the associated `ResponseFuture` will return an
347     /// [`Error`] to reflect the canceled exchange.
348     ///
349     /// [`Error`]: struct.Error.html
send_reset(&mut self, reason: Reason)350     pub fn send_reset(&mut self, reason: Reason) {
351         self.inner.send_reset(reason)
352     }
353 
354     /// Polls to be notified when the client resets this stream.
355     ///
356     /// If stream is still open, this returns `Poll::Pending`, and
357     /// registers the task to be notified if a `RST_STREAM` is received.
358     ///
359     /// If a `RST_STREAM` frame is received for this stream, calling this
360     /// method will yield the `Reason` for the reset.
361     ///
362     /// # Error
363     ///
364     /// If connection sees an error, this returns that error instead of a
365     /// `Reason`.
poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>>366     pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
367         self.inner.poll_reset(cx, proto::PollReset::Streaming)
368     }
369 
370     /// Returns the stream ID of this `SendStream`.
371     ///
372     /// # Panics
373     ///
374     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> StreamId375     pub fn stream_id(&self) -> StreamId {
376         StreamId::from_internal(self.inner.stream_id())
377     }
378 }
379 
380 // ===== impl StreamId =====
381 
382 impl StreamId {
from_internal(id: crate::frame::StreamId) -> Self383     pub(crate) fn from_internal(id: crate::frame::StreamId) -> Self {
384         StreamId(id.into())
385     }
386 }
387 // ===== impl RecvStream =====
388 
389 impl RecvStream {
new(inner: FlowControl) -> Self390     pub(crate) fn new(inner: FlowControl) -> Self {
391         RecvStream { inner }
392     }
393 
394     /// Get the next data frame.
data(&mut self) -> Option<Result<Bytes, crate::Error>>395     pub async fn data(&mut self) -> Option<Result<Bytes, crate::Error>> {
396         futures_util::future::poll_fn(move |cx| self.poll_data(cx)).await
397     }
398 
399     /// Get optional trailers for this stream.
trailers(&mut self) -> Result<Option<HeaderMap>, crate::Error>400     pub async fn trailers(&mut self) -> Result<Option<HeaderMap>, crate::Error> {
401         futures_util::future::poll_fn(move |cx| self.poll_trailers(cx)).await
402     }
403 
404     /// Poll for the next data frame.
poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, crate::Error>>>405     pub fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, crate::Error>>> {
406         self.inner.inner.poll_data(cx).map_err_(Into::into)
407     }
408 
409     #[doc(hidden)]
poll_trailers( &mut self, cx: &mut Context, ) -> Poll<Result<Option<HeaderMap>, crate::Error>>410     pub fn poll_trailers(
411         &mut self,
412         cx: &mut Context,
413     ) -> Poll<Result<Option<HeaderMap>, crate::Error>> {
414         match ready!(self.inner.inner.poll_trailers(cx)) {
415             Some(Ok(map)) => Poll::Ready(Ok(Some(map))),
416             Some(Err(e)) => Poll::Ready(Err(e.into())),
417             None => Poll::Ready(Ok(None)),
418         }
419     }
420 
421     /// Returns true if the receive half has reached the end of stream.
422     ///
423     /// A return value of `true` means that calls to `poll` and `poll_trailers`
424     /// will both return `None`.
is_end_stream(&self) -> bool425     pub fn is_end_stream(&self) -> bool {
426         self.inner.inner.is_end_stream()
427     }
428 
429     /// Get a mutable reference to this stream's `FlowControl`.
430     ///
431     /// It can be used immediately, or cloned to be used later.
flow_control(&mut self) -> &mut FlowControl432     pub fn flow_control(&mut self) -> &mut FlowControl {
433         &mut self.inner
434     }
435 
436     /// Returns the stream ID of this stream.
437     ///
438     /// # Panics
439     ///
440     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> StreamId441     pub fn stream_id(&self) -> StreamId {
442         self.inner.stream_id()
443     }
444 }
445 
446 #[cfg(feature = "stream")]
447 impl futures_core::Stream for RecvStream {
448     type Item = Result<Bytes, crate::Error>;
449 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>450     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
451         self.poll_data(cx)
452     }
453 }
454 
455 impl fmt::Debug for RecvStream {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result456     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
457         fmt.debug_struct("RecvStream")
458             .field("inner", &self.inner)
459             .finish()
460     }
461 }
462 
463 impl Drop for RecvStream {
drop(&mut self)464     fn drop(&mut self) {
465         // Eagerly clear any received DATA frames now, since its no longer
466         // possible to retrieve them. However, this will be called
467         // again once *all* stream refs have been dropped, since
468         // this won't send a RST_STREAM frame, in case the user wishes to
469         // still *send* DATA.
470         self.inner.inner.clear_recv_buffer();
471     }
472 }
473 
474 // ===== impl FlowControl =====
475 
476 impl FlowControl {
new(inner: proto::OpaqueStreamRef) -> Self477     pub(crate) fn new(inner: proto::OpaqueStreamRef) -> Self {
478         FlowControl { inner }
479     }
480 
481     /// Returns the stream ID of the stream whose capacity will
482     /// be released by this `FlowControl`.
stream_id(&self) -> StreamId483     pub fn stream_id(&self) -> StreamId {
484         StreamId::from_internal(self.inner.stream_id())
485     }
486 
487     /// Get the current available capacity of data this stream *could* receive.
available_capacity(&self) -> isize488     pub fn available_capacity(&self) -> isize {
489         self.inner.available_recv_capacity()
490     }
491 
492     /// Get the currently *used* capacity for this stream.
493     ///
494     /// This is the amount of bytes that can be released back to the remote.
used_capacity(&self) -> usize495     pub fn used_capacity(&self) -> usize {
496         self.inner.used_recv_capacity() as usize
497     }
498 
499     /// Release window capacity back to remote stream.
500     ///
501     /// This releases capacity back to the stream level and the connection level
502     /// windows. Both window sizes will be increased by `sz`.
503     ///
504     /// See [struct level] documentation for more details.
505     ///
506     /// # Errors
507     ///
508     /// This function errors if increasing the receive window size by `sz` would
509     /// result in a window size greater than the target window size. In other
510     /// words, the caller cannot release more capacity than data has been
511     /// received. If 1024 bytes of data have been received, at most 1024 bytes
512     /// can be released.
513     ///
514     /// [struct level]: #
release_capacity(&mut self, sz: usize) -> Result<(), crate::Error>515     pub fn release_capacity(&mut self, sz: usize) -> Result<(), crate::Error> {
516         if sz > proto::MAX_WINDOW_SIZE as usize {
517             return Err(UserError::ReleaseCapacityTooBig.into());
518         }
519         self.inner
520             .release_capacity(sz as proto::WindowSize)
521             .map_err(Into::into)
522     }
523 }
524 
525 // ===== impl PingPong =====
526 
527 impl PingPong {
new(inner: proto::UserPings) -> Self528     pub(crate) fn new(inner: proto::UserPings) -> Self {
529         PingPong { inner }
530     }
531 
532     /// Send a PING frame and wait for the peer to send the pong.
ping(&mut self, ping: Ping) -> Result<Pong, crate::Error>533     pub async fn ping(&mut self, ping: Ping) -> Result<Pong, crate::Error> {
534         self.send_ping(ping)?;
535         futures_util::future::poll_fn(|cx| self.poll_pong(cx)).await
536     }
537 
538     #[doc(hidden)]
send_ping(&mut self, ping: Ping) -> Result<(), crate::Error>539     pub fn send_ping(&mut self, ping: Ping) -> Result<(), crate::Error> {
540         // Passing a `Ping` here is just to be forwards-compatible with
541         // eventually allowing choosing a ping payload. For now, we can
542         // just drop it.
543         drop(ping);
544 
545         self.inner.send_ping().map_err(|err| match err {
546             Some(err) => err.into(),
547             None => UserError::SendPingWhilePending.into(),
548         })
549     }
550 
551     #[doc(hidden)]
poll_pong(&mut self, cx: &mut Context) -> Poll<Result<Pong, crate::Error>>552     pub fn poll_pong(&mut self, cx: &mut Context) -> Poll<Result<Pong, crate::Error>> {
553         ready!(self.inner.poll_pong(cx))?;
554         Poll::Ready(Ok(Pong { _p: () }))
555     }
556 }
557 
558 impl fmt::Debug for PingPong {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result559     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
560         fmt.debug_struct("PingPong").finish()
561     }
562 }
563 
564 // ===== impl Ping =====
565 
566 impl Ping {
567     /// Creates a new opaque `Ping` to be sent via a [`PingPong`][].
568     ///
569     /// The payload is "opaque", such that it shouldn't be depended on.
570     ///
571     /// [`PingPong`]: struct.PingPong.html
opaque() -> Ping572     pub fn opaque() -> Ping {
573         Ping { _p: () }
574     }
575 }
576 
577 impl fmt::Debug for Ping {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result578     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
579         fmt.debug_struct("Ping").finish()
580     }
581 }
582 
583 // ===== impl Pong =====
584 
585 impl fmt::Debug for Pong {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result586     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
587         fmt.debug_struct("Pong").finish()
588     }
589 }
590