1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5 use serde::{Deserialize, Deserializer, Serialize, Serializer};
6 use std::io;
7 use std::io::{Error, ErrorKind};
8 use std::sync::mpsc;
9
10 ///
11 /// Handles the channel implementation when in process channels are enabled.
12 ///
13
14 pub type PayloadSender = MsgSender<Payload>;
15
16 pub type PayloadReceiver = MsgReceiver<Payload>;
17
18 impl PayloadSenderHelperMethods for PayloadSender {
send_payload(&self, payload: Payload) -> Result<(), Error>19 fn send_payload(&self, payload: Payload) -> Result<(), Error> {
20 self.send(payload)
21 }
22 }
23
24 impl PayloadReceiverHelperMethods for PayloadReceiver {
recv_payload(&self) -> Result<Payload, Error>25 fn recv_payload(&self) -> Result<Payload, Error> {
26 self.recv()
27 }
28
to_mpsc_receiver(self) -> Receiver<Payload>29 fn to_mpsc_receiver(self) -> Receiver<Payload> {
30 self.rx
31 }
32 }
33
34 pub struct MsgReceiver<T> {
35 rx: mpsc::Receiver<T>,
36 }
37
38 impl<T> MsgReceiver<T> {
recv(&self) -> Result<T, Error>39 pub fn recv(&self) -> Result<T, Error> {
40 use std::error::Error;
41 self.rx.recv().map_err(|e| io::Error::new(ErrorKind::Other, e.description()))
42 }
43 }
44
45 #[derive(Clone)]
46 pub struct MsgSender<T> {
47 tx: mpsc::Sender<T>,
48 }
49
50 impl<T> MsgSender<T> {
send(&self, data: T) -> Result<(), Error>51 pub fn send(&self, data: T) -> Result<(), Error> {
52 self.tx.send(data).map_err(|_| Error::new(ErrorKind::Other, "cannot send on closed channel"))
53 }
54 }
55
payload_channel() -> Result<(PayloadSender, PayloadReceiver), Error>56 pub fn payload_channel() -> Result<(PayloadSender, PayloadReceiver), Error> {
57 let (tx, rx) = mpsc::channel();
58 Ok((PayloadSender { tx: tx }, PayloadReceiver { rx: rx }))
59 }
60
msg_channel<T>() -> Result<(MsgSender<T>, MsgReceiver<T>), Error>61 pub fn msg_channel<T>() -> Result<(MsgSender<T>, MsgReceiver<T>), Error> {
62 let (tx, rx) = mpsc::channel();
63 Ok((MsgSender { tx: tx }, MsgReceiver { rx: rx }))
64 }
65
66 ///
67 /// These serialize methods are needed to satisfy the compiler
68 /// which uses these implementations for IPC, and also for the
69 /// recording tool. The recording tool only outputs messages
70 /// that don't contain Senders or Receivers, so in theory
71 /// these should never be called in the in-process config.
72 /// If they are called, there may be a bug in the messages
73 /// that the replay tool is writing.
74 ///
75
76 impl<T> Serialize for MsgReceiver<T> {
serialize<S: Serializer>(&self, _: S) -> Result<S::Ok, S::Error>77 fn serialize<S: Serializer>(&self, _: S) -> Result<S::Ok, S::Error> {
78 unreachable!();
79 }
80 }
81
82 impl<T> Serialize for MsgSender<T> {
serialize<S: Serializer>(&self, _: S) -> Result<S::Ok, S::Error>83 fn serialize<S: Serializer>(&self, _: S) -> Result<S::Ok, S::Error> {
84 unreachable!();
85 }
86 }
87
88 impl<'de, T> Deserialize<'de> for MsgReceiver<T> {
deserialize<D>(_: D) -> Result<MsgReceiver<T>, D::Error> where D: Deserializer<'de>89 fn deserialize<D>(_: D) -> Result<MsgReceiver<T>, D::Error>
90 where D: Deserializer<'de> {
91 unreachable!();
92 }
93 }
94
95 impl<'de, T> Deserialize<'de> for MsgSender<T> {
deserialize<D>(_: D) -> Result<MsgSender<T>, D::Error> where D: Deserializer<'de>96 fn deserialize<D>(_: D) -> Result<MsgSender<T>, D::Error>
97 where D: Deserializer<'de> {
98 unreachable!();
99 }
100 }
101