1 // Copyright 2016 The xi-editor Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! Generic RPC handling (used for both front end and plugin communication).
16 //!
17 //! The RPC protocol is based on [JSON-RPC](http://www.jsonrpc.org/specification),
18 //! but with some modifications. Unlike JSON-RPC 2.0, requests and notifications
19 //! are allowed in both directions, rather than imposing client and server roles.
20 //! Further, the batch form is not supported.
21 //!
22 //! Because these changes make the protocol not fully compliant with the spec,
23 //! the `"jsonrpc"` member is omitted from request and response objects.
24 #![allow(clippy::boxed_local, clippy::or_fun_call)]
25 
26 #[macro_use]
27 extern crate serde_json;
28 #[macro_use]
29 extern crate serde_derive;
30 extern crate crossbeam;
31 extern crate serde;
32 extern crate xi_trace;
33 
34 #[macro_use]
35 extern crate log;
36 
37 mod error;
38 mod parse;
39 
40 pub mod test_utils;
41 
42 use std::cmp;
43 use std::collections::{BTreeMap, BinaryHeap, VecDeque};
44 use std::io::{self, BufRead, Write};
45 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
46 use std::sync::mpsc;
47 use std::sync::{Arc, Condvar, Mutex};
48 use std::thread;
49 use std::time::{Duration, Instant};
50 
51 use serde::de::DeserializeOwned;
52 use serde_json::Value;
53 
54 use xi_trace::{trace, trace_block, trace_block_payload, trace_payload};
55 
56 pub use crate::error::{Error, ReadError, RemoteError};
57 use crate::parse::{Call, MessageReader, Response, RpcObject};
58 
59 /// The maximum duration we will block on a reader before checking for an task.
60 const MAX_IDLE_WAIT: Duration = Duration::from_millis(5);
61 
62 /// An interface to access the other side of the RPC channel. The main purpose
63 /// is to send RPC requests and notifications to the peer.
64 ///
65 /// A single shared `RawPeer` exists for each `RpcLoop`; a reference can
66 /// be taken with `RpcLoop::get_peer()`.
67 ///
68 /// In general, `RawPeer` shouldn't be used directly, but behind a pointer as
69 /// the `Peer` trait object.
70 pub struct RawPeer<W: Write + 'static>(Arc<RpcState<W>>);
71 
72 /// The `Peer` trait represents the interface for the other side of the RPC
73 /// channel. It is intended to be used behind a pointer, a trait object.
74 pub trait Peer: Send + 'static {
75     /// Used to implement `clone` in an object-safe way.
76     /// For an explanation on this approach, see
77     /// [this thread](https://users.rust-lang.org/t/solved-is-it-possible-to-clone-a-boxed-trait-object/1714/6).
box_clone(&self) -> Box<dyn Peer>78     fn box_clone(&self) -> Box<dyn Peer>;
79     /// Sends a notification (asynchronous RPC) to the peer.
send_rpc_notification(&self, method: &str, params: &Value)80     fn send_rpc_notification(&self, method: &str, params: &Value);
81     /// Sends a request asynchronously, and the supplied callback will
82     /// be called when the response arrives.
83     ///
84     /// `Callback` is an alias for `FnOnce(Result<Value, Error>)`; it must
85     /// be boxed because trait objects cannot use generic paramaters.
send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>)86     fn send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>);
87     /// Sends a request (synchronous RPC) to the peer, and waits for the result.
send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error>88     fn send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error>;
89     /// Determines whether an incoming request (or notification) is
90     /// pending. This is intended to reduce latency for bulk operations
91     /// done in the background.
request_is_pending(&self) -> bool92     fn request_is_pending(&self) -> bool;
93     /// Adds a token to the idle queue. When the runloop is idle and the
94     /// queue is not empty, the handler's `idle` fn will be called
95     /// with the earliest added token.
schedule_idle(&self, token: usize)96     fn schedule_idle(&self, token: usize);
97     /// Like `schedule_idle`, with the guarantee that the handler's `idle`
98     /// fn will not be called _before_ the provided `Instant`.
99     ///
100     /// # Note
101     ///
102     /// This is not intended as a high-fidelity timer. Regular RPC messages
103     /// will always take priority over an idle task.
schedule_timer(&self, after: Instant, token: usize)104     fn schedule_timer(&self, after: Instant, token: usize);
105 }
106 
107 /// The `Peer` trait object.
108 pub type RpcPeer = Box<dyn Peer>;
109 
110 pub struct RpcCtx {
111     peer: RpcPeer,
112 }
113 
114 #[derive(Debug, Clone, Serialize, Deserialize)]
115 /// An RPC command.
116 ///
117 /// This type is used as a placeholder in various places, and can be
118 /// used by clients as a catchall type for implementing `MethodHandler`.
119 pub struct RpcCall {
120     pub method: String,
121     pub params: Value,
122 }
123 
124 /// A trait for types which can handle RPCs.
125 ///
126 /// Types which implement `MethodHandler` are also responsible for implementing
127 /// `Parser`; `Parser` is provided when Self::Notification and Self::Request
128 /// can be used with serde::DeserializeOwned.
129 pub trait Handler {
130     type Notification: DeserializeOwned;
131     type Request: DeserializeOwned;
handle_notification(&mut self, ctx: &RpcCtx, rpc: Self::Notification)132     fn handle_notification(&mut self, ctx: &RpcCtx, rpc: Self::Notification);
handle_request(&mut self, ctx: &RpcCtx, rpc: Self::Request) -> Result<Value, RemoteError>133     fn handle_request(&mut self, ctx: &RpcCtx, rpc: Self::Request) -> Result<Value, RemoteError>;
134     #[allow(unused_variables)]
idle(&mut self, ctx: &RpcCtx, token: usize)135     fn idle(&mut self, ctx: &RpcCtx, token: usize) {}
136 }
137 
138 pub trait Callback: Send {
call(self: Box<Self>, result: Result<Value, Error>)139     fn call(self: Box<Self>, result: Result<Value, Error>);
140 }
141 
142 impl<F: Send + FnOnce(Result<Value, Error>)> Callback for F {
call(self: Box<F>, result: Result<Value, Error>)143     fn call(self: Box<F>, result: Result<Value, Error>) {
144         (*self)(result)
145     }
146 }
147 
148 /// A helper type which shuts down the runloop if a panic occurs while
149 /// handling an RPC.
150 struct PanicGuard<'a, W: Write + 'static>(&'a RawPeer<W>);
151 
152 impl<'a, W: Write + 'static> Drop for PanicGuard<'a, W> {
drop(&mut self)153     fn drop(&mut self) {
154         if thread::panicking() {
155             error!("panic guard hit, closing runloop");
156             self.0.disconnect();
157         }
158     }
159 }
160 
161 trait IdleProc: Send {
call(self: Box<Self>, token: usize)162     fn call(self: Box<Self>, token: usize);
163 }
164 
165 impl<F: Send + FnOnce(usize)> IdleProc for F {
call(self: Box<F>, token: usize)166     fn call(self: Box<F>, token: usize) {
167         (*self)(token)
168     }
169 }
170 
171 enum ResponseHandler {
172     Chan(mpsc::Sender<Result<Value, Error>>),
173     Callback(Box<dyn Callback>),
174 }
175 
176 impl ResponseHandler {
invoke(self, result: Result<Value, Error>)177     fn invoke(self, result: Result<Value, Error>) {
178         match self {
179             ResponseHandler::Chan(tx) => {
180                 let _ = tx.send(result);
181             }
182             ResponseHandler::Callback(f) => f.call(result),
183         }
184     }
185 }
186 
187 #[derive(Debug, PartialEq, Eq)]
188 struct Timer {
189     fire_after: Instant,
190     token: usize,
191 }
192 
193 struct RpcState<W: Write> {
194     rx_queue: Mutex<VecDeque<Result<RpcObject, ReadError>>>,
195     rx_cvar: Condvar,
196     writer: Mutex<W>,
197     id: AtomicUsize,
198     pending: Mutex<BTreeMap<usize, ResponseHandler>>,
199     idle_queue: Mutex<VecDeque<usize>>,
200     timers: Mutex<BinaryHeap<Timer>>,
201     needs_exit: AtomicBool,
202     is_blocked: AtomicBool,
203 }
204 
205 /// A structure holding the state of a main loop for handling RPC's.
206 pub struct RpcLoop<W: Write + 'static> {
207     reader: MessageReader,
208     peer: RawPeer<W>,
209 }
210 
211 impl<W: Write + Send> RpcLoop<W> {
212     /// Creates a new `RpcLoop` with the given output stream (which is used for
213     /// sending requests and notifications, as well as responses).
new(writer: W) -> Self214     pub fn new(writer: W) -> Self {
215         let rpc_peer = RawPeer(Arc::new(RpcState {
216             rx_queue: Mutex::new(VecDeque::new()),
217             rx_cvar: Condvar::new(),
218             writer: Mutex::new(writer),
219             id: AtomicUsize::new(0),
220             pending: Mutex::new(BTreeMap::new()),
221             idle_queue: Mutex::new(VecDeque::new()),
222             timers: Mutex::new(BinaryHeap::new()),
223             needs_exit: AtomicBool::new(false),
224             is_blocked: AtomicBool::new(false),
225         }));
226         RpcLoop { reader: MessageReader::default(), peer: rpc_peer }
227     }
228 
229     /// Gets a reference to the peer.
get_raw_peer(&self) -> RawPeer<W>230     pub fn get_raw_peer(&self) -> RawPeer<W> {
231         self.peer.clone()
232     }
233 
234     /// Starts the event loop, reading lines from the reader until EOF,
235     /// or an error occurs.
236     ///
237     /// Returns `Ok()` in the EOF case, otherwise returns the
238     /// underlying `ReadError`.
239     ///
240     /// # Note:
241     /// The reader is supplied via a closure, as basically a workaround
242     /// so that the reader doesn't have to be `Send`. Internally, the
243     /// main loop starts a separate thread for I/O, and at startup that
244     /// thread calls the given closure.
245     ///
246     /// Calls to the handler happen on the caller's thread.
247     ///
248     /// Calls to the handler are guaranteed to preserve the order as
249     /// they appear on on the channel. At the moment, there is no way
250     /// for there to be more than one incoming request to be outstanding.
mainloop<R, RF, H>(&mut self, rf: RF, handler: &mut H) -> Result<(), ReadError> where R: BufRead, RF: Send + FnOnce() -> R, H: Handler,251     pub fn mainloop<R, RF, H>(&mut self, rf: RF, handler: &mut H) -> Result<(), ReadError>
252     where
253         R: BufRead,
254         RF: Send + FnOnce() -> R,
255         H: Handler,
256     {
257         let exit = crossbeam::scope(|scope| {
258             let peer = self.get_raw_peer();
259             peer.reset_needs_exit();
260 
261             let ctx = RpcCtx { peer: Box::new(peer.clone()) };
262             scope.spawn(move |_| {
263                 let mut stream = rf();
264                 loop {
265                     // The main thread cannot return while this thread is active;
266                     // when the main thread wants to exit it sets this flag.
267                     if self.peer.needs_exit() {
268                         trace("read loop exit", &["rpc"]);
269                         break;
270                     }
271 
272                     let json = match self.reader.next(&mut stream) {
273                         Ok(json) => json,
274                         Err(err) => {
275                             if self.peer.0.is_blocked.load(Ordering::Acquire) {
276                                 error!("failed to parse response json: {}", err);
277                                 self.peer.disconnect();
278                             }
279                             self.peer.put_rx(Err(err));
280                             break;
281                         }
282                     };
283                     if json.is_response() {
284                         let id = json.get_id().unwrap();
285                         let _resp =
286                             trace_block_payload("read loop response", &["rpc"], format!("{}", id));
287                         match json.into_response() {
288                             Ok(resp) => {
289                                 let resp = resp.map_err(Error::from);
290                                 self.peer.handle_response(id, resp);
291                             }
292                             Err(msg) => {
293                                 error!("failed to parse response: {}", msg);
294                                 self.peer.handle_response(id, Err(Error::InvalidResponse));
295                             }
296                         }
297                     } else {
298                         self.peer.put_rx(Ok(json));
299                     }
300                 }
301             });
302 
303             loop {
304                 let _guard = PanicGuard(&peer);
305                 let read_result = next_read(&peer, handler, &ctx);
306                 let _trace = trace_block("main got msg", &["rpc"]);
307 
308                 let json = match read_result {
309                     Ok(json) => json,
310                     Err(err) => {
311                         trace_payload("main loop err", &["rpc"], err.to_string());
312                         // finish idle work before disconnecting;
313                         // this is mostly useful for integration tests.
314                         if let Some(idle_token) = peer.try_get_idle() {
315                             handler.idle(&ctx, idle_token);
316                         }
317                         peer.disconnect();
318                         return err;
319                     }
320                 };
321 
322                 let method = json.get_method().map(String::from);
323                 match json.into_rpc::<H::Notification, H::Request>() {
324                     Ok(Call::Request(id, cmd)) => {
325                         let _t = trace_block_payload("handle request", &["rpc"], method.unwrap());
326                         let result = handler.handle_request(&ctx, cmd);
327                         peer.respond(result, id);
328                     }
329                     Ok(Call::Notification(cmd)) => {
330                         let _t = trace_block_payload("handle notif", &["rpc"], method.unwrap());
331                         handler.handle_notification(&ctx, cmd);
332                     }
333                     Ok(Call::InvalidRequest(id, err)) => peer.respond(Err(err), id),
334                     Err(err) => {
335                         trace_payload("read loop exit", &["rpc"], err.to_string());
336                         peer.disconnect();
337                         return ReadError::UnknownRequest(err);
338                     }
339                 }
340             }
341         })
342         .unwrap();
343 
344         if exit.is_disconnect() {
345             Ok(())
346         } else {
347             Err(exit)
348         }
349     }
350 }
351 
352 /// Returns the next read result, checking for idle work when no
353 /// result is available.
next_read<W, H>(peer: &RawPeer<W>, handler: &mut H, ctx: &RpcCtx) -> Result<RpcObject, ReadError> where W: Write + Send, H: Handler,354 fn next_read<W, H>(peer: &RawPeer<W>, handler: &mut H, ctx: &RpcCtx) -> Result<RpcObject, ReadError>
355 where
356     W: Write + Send,
357     H: Handler,
358 {
359     loop {
360         if let Some(result) = peer.try_get_rx() {
361             return result;
362         }
363         // handle timers before general idle work
364         let time_to_next_timer = match peer.check_timers() {
365             Some(Ok(token)) => {
366                 do_idle(handler, ctx, token);
367                 continue;
368             }
369             Some(Err(duration)) => Some(duration),
370             None => None,
371         };
372 
373         if let Some(idle_token) = peer.try_get_idle() {
374             do_idle(handler, ctx, idle_token);
375             continue;
376         }
377 
378         // we don't want to block indefinitely if there's no current idle work,
379         // because idle work could be scheduled from another thread.
380         let idle_timeout = time_to_next_timer.unwrap_or(MAX_IDLE_WAIT).min(MAX_IDLE_WAIT);
381 
382         if let Some(result) = peer.get_rx_timeout(idle_timeout) {
383             return result;
384         }
385     }
386 }
387 
do_idle<H: Handler>(handler: &mut H, ctx: &RpcCtx, token: usize)388 fn do_idle<H: Handler>(handler: &mut H, ctx: &RpcCtx, token: usize) {
389     let _trace = trace_block_payload("do_idle", &["rpc"], format!("token: {}", token));
390     handler.idle(ctx, token);
391 }
392 
393 impl RpcCtx {
get_peer(&self) -> &RpcPeer394     pub fn get_peer(&self) -> &RpcPeer {
395         &self.peer
396     }
397 
398     /// Schedule the idle handler to be run when there are no requests pending.
schedule_idle(&self, token: usize)399     pub fn schedule_idle(&self, token: usize) {
400         self.peer.schedule_idle(token)
401     }
402 }
403 
404 impl<W: Write + Send + 'static> Peer for RawPeer<W> {
box_clone(&self) -> Box<dyn Peer>405     fn box_clone(&self) -> Box<dyn Peer> {
406         Box::new((*self).clone())
407     }
408 
send_rpc_notification(&self, method: &str, params: &Value)409     fn send_rpc_notification(&self, method: &str, params: &Value) {
410         let _trace = trace_block_payload("send notif", &["rpc"], method.to_owned());
411         if let Err(e) = self.send(&json!({
412             "method": method,
413             "params": params,
414         })) {
415             error!("send error on send_rpc_notification method {}: {}", method, e);
416         }
417     }
418 
send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>)419     fn send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>) {
420         let _trace = trace_block_payload("send req async", &["rpc"], method.to_owned());
421         self.send_rpc_request_common(method, params, ResponseHandler::Callback(f));
422     }
423 
send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error>424     fn send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error> {
425         let _trace = trace_block_payload("send req sync", &["rpc"], method.to_owned());
426         self.0.is_blocked.store(true, Ordering::Release);
427         let (tx, rx) = mpsc::channel();
428         self.send_rpc_request_common(method, params, ResponseHandler::Chan(tx));
429         rx.recv().unwrap_or(Err(Error::PeerDisconnect))
430     }
431 
request_is_pending(&self) -> bool432     fn request_is_pending(&self) -> bool {
433         let queue = self.0.rx_queue.lock().unwrap();
434         !queue.is_empty()
435     }
436 
schedule_idle(&self, token: usize)437     fn schedule_idle(&self, token: usize) {
438         self.0.idle_queue.lock().unwrap().push_back(token);
439     }
440 
schedule_timer(&self, after: Instant, token: usize)441     fn schedule_timer(&self, after: Instant, token: usize) {
442         self.0.timers.lock().unwrap().push(Timer { fire_after: after, token });
443     }
444 }
445 
446 impl<W: Write> RawPeer<W> {
send(&self, v: &Value) -> Result<(), io::Error>447     fn send(&self, v: &Value) -> Result<(), io::Error> {
448         let _trace = trace_block("send", &["rpc"]);
449         let mut s = serde_json::to_string(v).unwrap();
450         s.push('\n');
451         self.0.writer.lock().unwrap().write_all(s.as_bytes())
452         // Technically, maybe we should flush here, but doesn't seem to be required.
453     }
454 
respond(&self, result: Response, id: u64)455     fn respond(&self, result: Response, id: u64) {
456         let mut response = json!({ "id": id });
457         match result {
458             Ok(result) => response["result"] = result,
459             Err(error) => response["error"] = json!(error),
460         };
461         if let Err(e) = self.send(&response) {
462             error!("error {} sending response to RPC {:?}", e, id);
463         }
464     }
465 
send_rpc_request_common(&self, method: &str, params: &Value, rh: ResponseHandler)466     fn send_rpc_request_common(&self, method: &str, params: &Value, rh: ResponseHandler) {
467         let id = self.0.id.fetch_add(1, Ordering::Relaxed);
468         {
469             let mut pending = self.0.pending.lock().unwrap();
470             pending.insert(id, rh);
471         }
472         if let Err(e) = self.send(&json!({
473             "id": id,
474             "method": method,
475             "params": params,
476         })) {
477             let mut pending = self.0.pending.lock().unwrap();
478             if let Some(rh) = pending.remove(&id) {
479                 rh.invoke(Err(Error::Io(e)));
480             }
481         }
482     }
483 
handle_response(&self, id: u64, resp: Result<Value, Error>)484     fn handle_response(&self, id: u64, resp: Result<Value, Error>) {
485         let id = id as usize;
486         let handler = {
487             let mut pending = self.0.pending.lock().unwrap();
488             pending.remove(&id)
489         };
490         match handler {
491             Some(responsehandler) => responsehandler.invoke(resp),
492             None => warn!("id {} not found in pending", id),
493         }
494     }
495 
496     /// Get a message from the receive queue if available.
try_get_rx(&self) -> Option<Result<RpcObject, ReadError>>497     fn try_get_rx(&self) -> Option<Result<RpcObject, ReadError>> {
498         let mut queue = self.0.rx_queue.lock().unwrap();
499         queue.pop_front()
500     }
501 
502     /// Get a message from the receive queue, waiting for at most `Duration`
503     /// and returning `None` if no message is available.
get_rx_timeout(&self, dur: Duration) -> Option<Result<RpcObject, ReadError>>504     fn get_rx_timeout(&self, dur: Duration) -> Option<Result<RpcObject, ReadError>> {
505         let mut queue = self.0.rx_queue.lock().unwrap();
506         let result = self.0.rx_cvar.wait_timeout(queue, dur).unwrap();
507         queue = result.0;
508         queue.pop_front()
509     }
510 
511     /// Adds a message to the receive queue. The message should only
512     /// be `None` if the read thread is exiting.
put_rx(&self, json: Result<RpcObject, ReadError>)513     fn put_rx(&self, json: Result<RpcObject, ReadError>) {
514         let mut queue = self.0.rx_queue.lock().unwrap();
515         queue.push_back(json);
516         self.0.rx_cvar.notify_one();
517     }
518 
try_get_idle(&self) -> Option<usize>519     fn try_get_idle(&self) -> Option<usize> {
520         self.0.idle_queue.lock().unwrap().pop_front()
521     }
522 
523     /// Checks status of the most imminent timer. If that timer has expired,
524     /// returns `Some(Ok(_))`, with the corresponding token.
525     /// If a timer exists but has not expired, returns `Some(Err(_))`,
526     /// with the error value being the `Duration` until the timer is ready.
527     /// Returns `None` if no timers are registered.
check_timers(&self) -> Option<Result<usize, Duration>>528     fn check_timers(&self) -> Option<Result<usize, Duration>> {
529         let mut timers = self.0.timers.lock().unwrap();
530         match timers.peek() {
531             None => return None,
532             Some(t) => {
533                 let now = Instant::now();
534                 if t.fire_after > now {
535                     return Some(Err(t.fire_after - now));
536                 }
537             }
538         }
539         Some(Ok(timers.pop().unwrap().token))
540     }
541 
542     /// send disconnect error to pending requests.
disconnect(&self)543     fn disconnect(&self) {
544         let mut pending = self.0.pending.lock().unwrap();
545         let ids = pending.keys().cloned().collect::<Vec<_>>();
546         for id in &ids {
547             let callback = pending.remove(id).unwrap();
548             callback.invoke(Err(Error::PeerDisconnect));
549         }
550         self.0.needs_exit.store(true, Ordering::Relaxed);
551     }
552 
553     /// Returns `true` if an error has occured in the main thread.
needs_exit(&self) -> bool554     fn needs_exit(&self) -> bool {
555         self.0.needs_exit.load(Ordering::Relaxed)
556     }
557 
reset_needs_exit(&self)558     fn reset_needs_exit(&self) {
559         self.0.needs_exit.store(false, Ordering::SeqCst);
560     }
561 }
562 
563 impl Clone for Box<dyn Peer> {
clone(&self) -> Box<dyn Peer>564     fn clone(&self) -> Box<dyn Peer> {
565         self.box_clone()
566     }
567 }
568 
569 impl<W: Write> Clone for RawPeer<W> {
clone(&self) -> Self570     fn clone(&self) -> Self {
571         RawPeer(self.0.clone())
572     }
573 }
574 
575 //NOTE: for our timers to work with Rust's BinaryHeap we want to reverse
576 //the default comparison; smaller `Instant`'s are considered 'greater'.
577 impl Ord for Timer {
cmp(&self, other: &Timer) -> cmp::Ordering578     fn cmp(&self, other: &Timer) -> cmp::Ordering {
579         other.fire_after.cmp(&self.fire_after)
580     }
581 }
582 
583 impl PartialOrd for Timer {
partial_cmp(&self, other: &Timer) -> Option<cmp::Ordering>584     fn partial_cmp(&self, other: &Timer) -> Option<cmp::Ordering> {
585         Some(self.cmp(other))
586     }
587 }
588 
589 #[cfg(test)]
590 mod tests {
591     use super::*;
592 
593     #[test]
test_parse_notif()594     fn test_parse_notif() {
595         let reader = MessageReader::default();
596         let json = reader.parse(r#"{"method": "hi", "params": {"words": "plz"}}"#).unwrap();
597         assert!(!json.is_response());
598         let rpc = json.into_rpc::<Value, Value>().unwrap();
599         match rpc {
600             Call::Notification(_) => (),
601             _ => panic!("parse failed"),
602         }
603     }
604 
605     #[test]
test_parse_req()606     fn test_parse_req() {
607         let reader = MessageReader::default();
608         let json =
609             reader.parse(r#"{"id": 5, "method": "hi", "params": {"words": "plz"}}"#).unwrap();
610         assert!(!json.is_response());
611         let rpc = json.into_rpc::<Value, Value>().unwrap();
612         match rpc {
613             Call::Request(..) => (),
614             _ => panic!("parse failed"),
615         }
616     }
617 
618     #[test]
test_parse_bad_json()619     fn test_parse_bad_json() {
620         // missing "" around params
621         let reader = MessageReader::default();
622         let json =
623             reader.parse(r#"{"id": 5, "method": "hi", params: {"words": "plz"}}"#).err().unwrap();
624 
625         match json {
626             ReadError::Json(..) => (),
627             _ => panic!("parse failed"),
628         }
629         // not an object
630         let json = reader.parse(r#"[5, "hi", {"arg": "val"}]"#).err().unwrap();
631 
632         match json {
633             ReadError::NotObject => (),
634             _ => panic!("parse failed"),
635         }
636     }
637 }
638