1 // Copyright 2016 The xi-editor Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //! Generic RPC handling (used for both front end and plugin communication).
16 //!
17 //! The RPC protocol is based on [JSON-RPC](http://www.jsonrpc.org/specification),
18 //! but with some modifications. Unlike JSON-RPC 2.0, requests and notifications
19 //! are allowed in both directions, rather than imposing client and server roles.
20 //! Further, the batch form is not supported.
21 //!
22 //! Because these changes make the protocol not fully compliant with the spec,
23 //! the `"jsonrpc"` member is omitted from request and response objects.
24 #![allow(clippy::boxed_local, clippy::or_fun_call)]
25
26 #[macro_use]
27 extern crate serde_json;
28 #[macro_use]
29 extern crate serde_derive;
30 extern crate crossbeam;
31 extern crate serde;
32 extern crate xi_trace;
33
34 #[macro_use]
35 extern crate log;
36
37 mod error;
38 mod parse;
39
40 pub mod test_utils;
41
42 use std::cmp;
43 use std::collections::{BTreeMap, BinaryHeap, VecDeque};
44 use std::io::{self, BufRead, Write};
45 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
46 use std::sync::mpsc;
47 use std::sync::{Arc, Condvar, Mutex};
48 use std::thread;
49 use std::time::{Duration, Instant};
50
51 use serde::de::DeserializeOwned;
52 use serde_json::Value;
53
54 use xi_trace::{trace, trace_block, trace_block_payload, trace_payload};
55
56 pub use crate::error::{Error, ReadError, RemoteError};
57 use crate::parse::{Call, MessageReader, Response, RpcObject};
58
59 /// The maximum duration we will block on a reader before checking for an task.
60 const MAX_IDLE_WAIT: Duration = Duration::from_millis(5);
61
62 /// An interface to access the other side of the RPC channel. The main purpose
63 /// is to send RPC requests and notifications to the peer.
64 ///
65 /// A single shared `RawPeer` exists for each `RpcLoop`; a reference can
66 /// be taken with `RpcLoop::get_peer()`.
67 ///
68 /// In general, `RawPeer` shouldn't be used directly, but behind a pointer as
69 /// the `Peer` trait object.
70 pub struct RawPeer<W: Write + 'static>(Arc<RpcState<W>>);
71
72 /// The `Peer` trait represents the interface for the other side of the RPC
73 /// channel. It is intended to be used behind a pointer, a trait object.
74 pub trait Peer: Send + 'static {
75 /// Used to implement `clone` in an object-safe way.
76 /// For an explanation on this approach, see
77 /// [this thread](https://users.rust-lang.org/t/solved-is-it-possible-to-clone-a-boxed-trait-object/1714/6).
box_clone(&self) -> Box<dyn Peer>78 fn box_clone(&self) -> Box<dyn Peer>;
79 /// Sends a notification (asynchronous RPC) to the peer.
send_rpc_notification(&self, method: &str, params: &Value)80 fn send_rpc_notification(&self, method: &str, params: &Value);
81 /// Sends a request asynchronously, and the supplied callback will
82 /// be called when the response arrives.
83 ///
84 /// `Callback` is an alias for `FnOnce(Result<Value, Error>)`; it must
85 /// be boxed because trait objects cannot use generic paramaters.
send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>)86 fn send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>);
87 /// Sends a request (synchronous RPC) to the peer, and waits for the result.
send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error>88 fn send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error>;
89 /// Determines whether an incoming request (or notification) is
90 /// pending. This is intended to reduce latency for bulk operations
91 /// done in the background.
request_is_pending(&self) -> bool92 fn request_is_pending(&self) -> bool;
93 /// Adds a token to the idle queue. When the runloop is idle and the
94 /// queue is not empty, the handler's `idle` fn will be called
95 /// with the earliest added token.
schedule_idle(&self, token: usize)96 fn schedule_idle(&self, token: usize);
97 /// Like `schedule_idle`, with the guarantee that the handler's `idle`
98 /// fn will not be called _before_ the provided `Instant`.
99 ///
100 /// # Note
101 ///
102 /// This is not intended as a high-fidelity timer. Regular RPC messages
103 /// will always take priority over an idle task.
schedule_timer(&self, after: Instant, token: usize)104 fn schedule_timer(&self, after: Instant, token: usize);
105 }
106
107 /// The `Peer` trait object.
108 pub type RpcPeer = Box<dyn Peer>;
109
110 pub struct RpcCtx {
111 peer: RpcPeer,
112 }
113
114 #[derive(Debug, Clone, Serialize, Deserialize)]
115 /// An RPC command.
116 ///
117 /// This type is used as a placeholder in various places, and can be
118 /// used by clients as a catchall type for implementing `MethodHandler`.
119 pub struct RpcCall {
120 pub method: String,
121 pub params: Value,
122 }
123
124 /// A trait for types which can handle RPCs.
125 ///
126 /// Types which implement `MethodHandler` are also responsible for implementing
127 /// `Parser`; `Parser` is provided when Self::Notification and Self::Request
128 /// can be used with serde::DeserializeOwned.
129 pub trait Handler {
130 type Notification: DeserializeOwned;
131 type Request: DeserializeOwned;
handle_notification(&mut self, ctx: &RpcCtx, rpc: Self::Notification)132 fn handle_notification(&mut self, ctx: &RpcCtx, rpc: Self::Notification);
handle_request(&mut self, ctx: &RpcCtx, rpc: Self::Request) -> Result<Value, RemoteError>133 fn handle_request(&mut self, ctx: &RpcCtx, rpc: Self::Request) -> Result<Value, RemoteError>;
134 #[allow(unused_variables)]
idle(&mut self, ctx: &RpcCtx, token: usize)135 fn idle(&mut self, ctx: &RpcCtx, token: usize) {}
136 }
137
138 pub trait Callback: Send {
call(self: Box<Self>, result: Result<Value, Error>)139 fn call(self: Box<Self>, result: Result<Value, Error>);
140 }
141
142 impl<F: Send + FnOnce(Result<Value, Error>)> Callback for F {
call(self: Box<F>, result: Result<Value, Error>)143 fn call(self: Box<F>, result: Result<Value, Error>) {
144 (*self)(result)
145 }
146 }
147
148 /// A helper type which shuts down the runloop if a panic occurs while
149 /// handling an RPC.
150 struct PanicGuard<'a, W: Write + 'static>(&'a RawPeer<W>);
151
152 impl<'a, W: Write + 'static> Drop for PanicGuard<'a, W> {
drop(&mut self)153 fn drop(&mut self) {
154 if thread::panicking() {
155 error!("panic guard hit, closing runloop");
156 self.0.disconnect();
157 }
158 }
159 }
160
161 trait IdleProc: Send {
call(self: Box<Self>, token: usize)162 fn call(self: Box<Self>, token: usize);
163 }
164
165 impl<F: Send + FnOnce(usize)> IdleProc for F {
call(self: Box<F>, token: usize)166 fn call(self: Box<F>, token: usize) {
167 (*self)(token)
168 }
169 }
170
171 enum ResponseHandler {
172 Chan(mpsc::Sender<Result<Value, Error>>),
173 Callback(Box<dyn Callback>),
174 }
175
176 impl ResponseHandler {
invoke(self, result: Result<Value, Error>)177 fn invoke(self, result: Result<Value, Error>) {
178 match self {
179 ResponseHandler::Chan(tx) => {
180 let _ = tx.send(result);
181 }
182 ResponseHandler::Callback(f) => f.call(result),
183 }
184 }
185 }
186
187 #[derive(Debug, PartialEq, Eq)]
188 struct Timer {
189 fire_after: Instant,
190 token: usize,
191 }
192
193 struct RpcState<W: Write> {
194 rx_queue: Mutex<VecDeque<Result<RpcObject, ReadError>>>,
195 rx_cvar: Condvar,
196 writer: Mutex<W>,
197 id: AtomicUsize,
198 pending: Mutex<BTreeMap<usize, ResponseHandler>>,
199 idle_queue: Mutex<VecDeque<usize>>,
200 timers: Mutex<BinaryHeap<Timer>>,
201 needs_exit: AtomicBool,
202 is_blocked: AtomicBool,
203 }
204
205 /// A structure holding the state of a main loop for handling RPC's.
206 pub struct RpcLoop<W: Write + 'static> {
207 reader: MessageReader,
208 peer: RawPeer<W>,
209 }
210
211 impl<W: Write + Send> RpcLoop<W> {
212 /// Creates a new `RpcLoop` with the given output stream (which is used for
213 /// sending requests and notifications, as well as responses).
new(writer: W) -> Self214 pub fn new(writer: W) -> Self {
215 let rpc_peer = RawPeer(Arc::new(RpcState {
216 rx_queue: Mutex::new(VecDeque::new()),
217 rx_cvar: Condvar::new(),
218 writer: Mutex::new(writer),
219 id: AtomicUsize::new(0),
220 pending: Mutex::new(BTreeMap::new()),
221 idle_queue: Mutex::new(VecDeque::new()),
222 timers: Mutex::new(BinaryHeap::new()),
223 needs_exit: AtomicBool::new(false),
224 is_blocked: AtomicBool::new(false),
225 }));
226 RpcLoop { reader: MessageReader::default(), peer: rpc_peer }
227 }
228
229 /// Gets a reference to the peer.
get_raw_peer(&self) -> RawPeer<W>230 pub fn get_raw_peer(&self) -> RawPeer<W> {
231 self.peer.clone()
232 }
233
234 /// Starts the event loop, reading lines from the reader until EOF,
235 /// or an error occurs.
236 ///
237 /// Returns `Ok()` in the EOF case, otherwise returns the
238 /// underlying `ReadError`.
239 ///
240 /// # Note:
241 /// The reader is supplied via a closure, as basically a workaround
242 /// so that the reader doesn't have to be `Send`. Internally, the
243 /// main loop starts a separate thread for I/O, and at startup that
244 /// thread calls the given closure.
245 ///
246 /// Calls to the handler happen on the caller's thread.
247 ///
248 /// Calls to the handler are guaranteed to preserve the order as
249 /// they appear on on the channel. At the moment, there is no way
250 /// for there to be more than one incoming request to be outstanding.
mainloop<R, RF, H>(&mut self, rf: RF, handler: &mut H) -> Result<(), ReadError> where R: BufRead, RF: Send + FnOnce() -> R, H: Handler,251 pub fn mainloop<R, RF, H>(&mut self, rf: RF, handler: &mut H) -> Result<(), ReadError>
252 where
253 R: BufRead,
254 RF: Send + FnOnce() -> R,
255 H: Handler,
256 {
257 let exit = crossbeam::scope(|scope| {
258 let peer = self.get_raw_peer();
259 peer.reset_needs_exit();
260
261 let ctx = RpcCtx { peer: Box::new(peer.clone()) };
262 scope.spawn(move |_| {
263 let mut stream = rf();
264 loop {
265 // The main thread cannot return while this thread is active;
266 // when the main thread wants to exit it sets this flag.
267 if self.peer.needs_exit() {
268 trace("read loop exit", &["rpc"]);
269 break;
270 }
271
272 let json = match self.reader.next(&mut stream) {
273 Ok(json) => json,
274 Err(err) => {
275 if self.peer.0.is_blocked.load(Ordering::Acquire) {
276 error!("failed to parse response json: {}", err);
277 self.peer.disconnect();
278 }
279 self.peer.put_rx(Err(err));
280 break;
281 }
282 };
283 if json.is_response() {
284 let id = json.get_id().unwrap();
285 let _resp =
286 trace_block_payload("read loop response", &["rpc"], format!("{}", id));
287 match json.into_response() {
288 Ok(resp) => {
289 let resp = resp.map_err(Error::from);
290 self.peer.handle_response(id, resp);
291 }
292 Err(msg) => {
293 error!("failed to parse response: {}", msg);
294 self.peer.handle_response(id, Err(Error::InvalidResponse));
295 }
296 }
297 } else {
298 self.peer.put_rx(Ok(json));
299 }
300 }
301 });
302
303 loop {
304 let _guard = PanicGuard(&peer);
305 let read_result = next_read(&peer, handler, &ctx);
306 let _trace = trace_block("main got msg", &["rpc"]);
307
308 let json = match read_result {
309 Ok(json) => json,
310 Err(err) => {
311 trace_payload("main loop err", &["rpc"], err.to_string());
312 // finish idle work before disconnecting;
313 // this is mostly useful for integration tests.
314 if let Some(idle_token) = peer.try_get_idle() {
315 handler.idle(&ctx, idle_token);
316 }
317 peer.disconnect();
318 return err;
319 }
320 };
321
322 let method = json.get_method().map(String::from);
323 match json.into_rpc::<H::Notification, H::Request>() {
324 Ok(Call::Request(id, cmd)) => {
325 let _t = trace_block_payload("handle request", &["rpc"], method.unwrap());
326 let result = handler.handle_request(&ctx, cmd);
327 peer.respond(result, id);
328 }
329 Ok(Call::Notification(cmd)) => {
330 let _t = trace_block_payload("handle notif", &["rpc"], method.unwrap());
331 handler.handle_notification(&ctx, cmd);
332 }
333 Ok(Call::InvalidRequest(id, err)) => peer.respond(Err(err), id),
334 Err(err) => {
335 trace_payload("read loop exit", &["rpc"], err.to_string());
336 peer.disconnect();
337 return ReadError::UnknownRequest(err);
338 }
339 }
340 }
341 })
342 .unwrap();
343
344 if exit.is_disconnect() {
345 Ok(())
346 } else {
347 Err(exit)
348 }
349 }
350 }
351
352 /// Returns the next read result, checking for idle work when no
353 /// result is available.
next_read<W, H>(peer: &RawPeer<W>, handler: &mut H, ctx: &RpcCtx) -> Result<RpcObject, ReadError> where W: Write + Send, H: Handler,354 fn next_read<W, H>(peer: &RawPeer<W>, handler: &mut H, ctx: &RpcCtx) -> Result<RpcObject, ReadError>
355 where
356 W: Write + Send,
357 H: Handler,
358 {
359 loop {
360 if let Some(result) = peer.try_get_rx() {
361 return result;
362 }
363 // handle timers before general idle work
364 let time_to_next_timer = match peer.check_timers() {
365 Some(Ok(token)) => {
366 do_idle(handler, ctx, token);
367 continue;
368 }
369 Some(Err(duration)) => Some(duration),
370 None => None,
371 };
372
373 if let Some(idle_token) = peer.try_get_idle() {
374 do_idle(handler, ctx, idle_token);
375 continue;
376 }
377
378 // we don't want to block indefinitely if there's no current idle work,
379 // because idle work could be scheduled from another thread.
380 let idle_timeout = time_to_next_timer.unwrap_or(MAX_IDLE_WAIT).min(MAX_IDLE_WAIT);
381
382 if let Some(result) = peer.get_rx_timeout(idle_timeout) {
383 return result;
384 }
385 }
386 }
387
do_idle<H: Handler>(handler: &mut H, ctx: &RpcCtx, token: usize)388 fn do_idle<H: Handler>(handler: &mut H, ctx: &RpcCtx, token: usize) {
389 let _trace = trace_block_payload("do_idle", &["rpc"], format!("token: {}", token));
390 handler.idle(ctx, token);
391 }
392
393 impl RpcCtx {
get_peer(&self) -> &RpcPeer394 pub fn get_peer(&self) -> &RpcPeer {
395 &self.peer
396 }
397
398 /// Schedule the idle handler to be run when there are no requests pending.
schedule_idle(&self, token: usize)399 pub fn schedule_idle(&self, token: usize) {
400 self.peer.schedule_idle(token)
401 }
402 }
403
404 impl<W: Write + Send + 'static> Peer for RawPeer<W> {
box_clone(&self) -> Box<dyn Peer>405 fn box_clone(&self) -> Box<dyn Peer> {
406 Box::new((*self).clone())
407 }
408
send_rpc_notification(&self, method: &str, params: &Value)409 fn send_rpc_notification(&self, method: &str, params: &Value) {
410 let _trace = trace_block_payload("send notif", &["rpc"], method.to_owned());
411 if let Err(e) = self.send(&json!({
412 "method": method,
413 "params": params,
414 })) {
415 error!("send error on send_rpc_notification method {}: {}", method, e);
416 }
417 }
418
send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>)419 fn send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>) {
420 let _trace = trace_block_payload("send req async", &["rpc"], method.to_owned());
421 self.send_rpc_request_common(method, params, ResponseHandler::Callback(f));
422 }
423
send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error>424 fn send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error> {
425 let _trace = trace_block_payload("send req sync", &["rpc"], method.to_owned());
426 self.0.is_blocked.store(true, Ordering::Release);
427 let (tx, rx) = mpsc::channel();
428 self.send_rpc_request_common(method, params, ResponseHandler::Chan(tx));
429 rx.recv().unwrap_or(Err(Error::PeerDisconnect))
430 }
431
request_is_pending(&self) -> bool432 fn request_is_pending(&self) -> bool {
433 let queue = self.0.rx_queue.lock().unwrap();
434 !queue.is_empty()
435 }
436
schedule_idle(&self, token: usize)437 fn schedule_idle(&self, token: usize) {
438 self.0.idle_queue.lock().unwrap().push_back(token);
439 }
440
schedule_timer(&self, after: Instant, token: usize)441 fn schedule_timer(&self, after: Instant, token: usize) {
442 self.0.timers.lock().unwrap().push(Timer { fire_after: after, token });
443 }
444 }
445
446 impl<W: Write> RawPeer<W> {
send(&self, v: &Value) -> Result<(), io::Error>447 fn send(&self, v: &Value) -> Result<(), io::Error> {
448 let _trace = trace_block("send", &["rpc"]);
449 let mut s = serde_json::to_string(v).unwrap();
450 s.push('\n');
451 self.0.writer.lock().unwrap().write_all(s.as_bytes())
452 // Technically, maybe we should flush here, but doesn't seem to be required.
453 }
454
respond(&self, result: Response, id: u64)455 fn respond(&self, result: Response, id: u64) {
456 let mut response = json!({ "id": id });
457 match result {
458 Ok(result) => response["result"] = result,
459 Err(error) => response["error"] = json!(error),
460 };
461 if let Err(e) = self.send(&response) {
462 error!("error {} sending response to RPC {:?}", e, id);
463 }
464 }
465
send_rpc_request_common(&self, method: &str, params: &Value, rh: ResponseHandler)466 fn send_rpc_request_common(&self, method: &str, params: &Value, rh: ResponseHandler) {
467 let id = self.0.id.fetch_add(1, Ordering::Relaxed);
468 {
469 let mut pending = self.0.pending.lock().unwrap();
470 pending.insert(id, rh);
471 }
472 if let Err(e) = self.send(&json!({
473 "id": id,
474 "method": method,
475 "params": params,
476 })) {
477 let mut pending = self.0.pending.lock().unwrap();
478 if let Some(rh) = pending.remove(&id) {
479 rh.invoke(Err(Error::Io(e)));
480 }
481 }
482 }
483
handle_response(&self, id: u64, resp: Result<Value, Error>)484 fn handle_response(&self, id: u64, resp: Result<Value, Error>) {
485 let id = id as usize;
486 let handler = {
487 let mut pending = self.0.pending.lock().unwrap();
488 pending.remove(&id)
489 };
490 match handler {
491 Some(responsehandler) => responsehandler.invoke(resp),
492 None => warn!("id {} not found in pending", id),
493 }
494 }
495
496 /// Get a message from the receive queue if available.
try_get_rx(&self) -> Option<Result<RpcObject, ReadError>>497 fn try_get_rx(&self) -> Option<Result<RpcObject, ReadError>> {
498 let mut queue = self.0.rx_queue.lock().unwrap();
499 queue.pop_front()
500 }
501
502 /// Get a message from the receive queue, waiting for at most `Duration`
503 /// and returning `None` if no message is available.
get_rx_timeout(&self, dur: Duration) -> Option<Result<RpcObject, ReadError>>504 fn get_rx_timeout(&self, dur: Duration) -> Option<Result<RpcObject, ReadError>> {
505 let mut queue = self.0.rx_queue.lock().unwrap();
506 let result = self.0.rx_cvar.wait_timeout(queue, dur).unwrap();
507 queue = result.0;
508 queue.pop_front()
509 }
510
511 /// Adds a message to the receive queue. The message should only
512 /// be `None` if the read thread is exiting.
put_rx(&self, json: Result<RpcObject, ReadError>)513 fn put_rx(&self, json: Result<RpcObject, ReadError>) {
514 let mut queue = self.0.rx_queue.lock().unwrap();
515 queue.push_back(json);
516 self.0.rx_cvar.notify_one();
517 }
518
try_get_idle(&self) -> Option<usize>519 fn try_get_idle(&self) -> Option<usize> {
520 self.0.idle_queue.lock().unwrap().pop_front()
521 }
522
523 /// Checks status of the most imminent timer. If that timer has expired,
524 /// returns `Some(Ok(_))`, with the corresponding token.
525 /// If a timer exists but has not expired, returns `Some(Err(_))`,
526 /// with the error value being the `Duration` until the timer is ready.
527 /// Returns `None` if no timers are registered.
check_timers(&self) -> Option<Result<usize, Duration>>528 fn check_timers(&self) -> Option<Result<usize, Duration>> {
529 let mut timers = self.0.timers.lock().unwrap();
530 match timers.peek() {
531 None => return None,
532 Some(t) => {
533 let now = Instant::now();
534 if t.fire_after > now {
535 return Some(Err(t.fire_after - now));
536 }
537 }
538 }
539 Some(Ok(timers.pop().unwrap().token))
540 }
541
542 /// send disconnect error to pending requests.
disconnect(&self)543 fn disconnect(&self) {
544 let mut pending = self.0.pending.lock().unwrap();
545 let ids = pending.keys().cloned().collect::<Vec<_>>();
546 for id in &ids {
547 let callback = pending.remove(id).unwrap();
548 callback.invoke(Err(Error::PeerDisconnect));
549 }
550 self.0.needs_exit.store(true, Ordering::Relaxed);
551 }
552
553 /// Returns `true` if an error has occured in the main thread.
needs_exit(&self) -> bool554 fn needs_exit(&self) -> bool {
555 self.0.needs_exit.load(Ordering::Relaxed)
556 }
557
reset_needs_exit(&self)558 fn reset_needs_exit(&self) {
559 self.0.needs_exit.store(false, Ordering::SeqCst);
560 }
561 }
562
563 impl Clone for Box<dyn Peer> {
clone(&self) -> Box<dyn Peer>564 fn clone(&self) -> Box<dyn Peer> {
565 self.box_clone()
566 }
567 }
568
569 impl<W: Write> Clone for RawPeer<W> {
clone(&self) -> Self570 fn clone(&self) -> Self {
571 RawPeer(self.0.clone())
572 }
573 }
574
575 //NOTE: for our timers to work with Rust's BinaryHeap we want to reverse
576 //the default comparison; smaller `Instant`'s are considered 'greater'.
577 impl Ord for Timer {
cmp(&self, other: &Timer) -> cmp::Ordering578 fn cmp(&self, other: &Timer) -> cmp::Ordering {
579 other.fire_after.cmp(&self.fire_after)
580 }
581 }
582
583 impl PartialOrd for Timer {
partial_cmp(&self, other: &Timer) -> Option<cmp::Ordering>584 fn partial_cmp(&self, other: &Timer) -> Option<cmp::Ordering> {
585 Some(self.cmp(other))
586 }
587 }
588
589 #[cfg(test)]
590 mod tests {
591 use super::*;
592
593 #[test]
test_parse_notif()594 fn test_parse_notif() {
595 let reader = MessageReader::default();
596 let json = reader.parse(r#"{"method": "hi", "params": {"words": "plz"}}"#).unwrap();
597 assert!(!json.is_response());
598 let rpc = json.into_rpc::<Value, Value>().unwrap();
599 match rpc {
600 Call::Notification(_) => (),
601 _ => panic!("parse failed"),
602 }
603 }
604
605 #[test]
test_parse_req()606 fn test_parse_req() {
607 let reader = MessageReader::default();
608 let json =
609 reader.parse(r#"{"id": 5, "method": "hi", "params": {"words": "plz"}}"#).unwrap();
610 assert!(!json.is_response());
611 let rpc = json.into_rpc::<Value, Value>().unwrap();
612 match rpc {
613 Call::Request(..) => (),
614 _ => panic!("parse failed"),
615 }
616 }
617
618 #[test]
test_parse_bad_json()619 fn test_parse_bad_json() {
620 // missing "" around params
621 let reader = MessageReader::default();
622 let json =
623 reader.parse(r#"{"id": 5, "method": "hi", params: {"words": "plz"}}"#).err().unwrap();
624
625 match json {
626 ReadError::Json(..) => (),
627 _ => panic!("parse failed"),
628 }
629 // not an object
630 let json = reader.parse(r#"[5, "hi", {"arg": "val"}]"#).err().unwrap();
631
632 match json {
633 ReadError::NotObject => (),
634 _ => panic!("parse failed"),
635 }
636 }
637 }
638