1 // Copyright (c) 2018 Kenton Varda and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21 
22 #include "json-rpc.h"
23 #include <kj/compat/http.h>
24 #include <capnp/compat/json-rpc.capnp.h>
25 
26 namespace capnp {
27 
28 static constexpr uint64_t JSON_NAME_ANNOTATION_ID = 0xfa5b1fd61c2e7c3dull;
29 static constexpr uint64_t JSON_NOTIFICATION_ANNOTATION_ID = 0xa0a054dea32fd98cull;
30 
31 class JsonRpc::CapabilityImpl final: public DynamicCapability::Server {
32 public:
CapabilityImpl(JsonRpc & parent,InterfaceSchema schema)33   CapabilityImpl(JsonRpc& parent, InterfaceSchema schema)
34       : DynamicCapability::Server(schema), parent(parent) {}
35 
call(InterfaceSchema::Method method,CallContext<DynamicStruct,DynamicStruct> context)36   kj::Promise<void> call(InterfaceSchema::Method method,
37                          CallContext<DynamicStruct, DynamicStruct> context) override {
38     auto proto = method.getProto();
39     bool isNotification = false;
40     kj::StringPtr name = proto.getName();
41     for (auto annotation: proto.getAnnotations()) {
42       switch (annotation.getId()) {
43         case JSON_NAME_ANNOTATION_ID:
44           name = annotation.getValue().getText();
45           break;
46         case JSON_NOTIFICATION_ANNOTATION_ID:
47           isNotification = true;
48           break;
49       }
50     }
51 
52     capnp::MallocMessageBuilder message;
53     auto value = message.getRoot<json::Value>();
54     auto list = value.initObject(3 + !isNotification);
55 
56     uint index = 0;
57 
58     auto jsonrpc = list[index++];
59     jsonrpc.setName("jsonrpc");
60     jsonrpc.initValue().setString("2.0");
61 
62     uint callId = parent.callCount++;
63 
64     if (!isNotification) {
65       auto id = list[index++];
66       id.setName("id");
67       id.initValue().setNumber(callId);
68     }
69 
70     auto methodName = list[index++];
71     methodName.setName("method");
72     methodName.initValue().setString(name);
73 
74     auto params = list[index++];
75     params.setName("params");
76     parent.codec.encode(context.getParams(), params.initValue());
77 
78     auto writePromise = parent.queueWrite(parent.codec.encode(value));
79 
80     if (isNotification) {
81       auto sproto = context.getResultsType().getProto().getStruct();
82       MessageSize size { sproto.getDataWordCount(), sproto.getPointerCount() };
83       context.initResults(size);
84       return kj::mv(writePromise);
85     } else {
86       auto paf = kj::newPromiseAndFulfiller<void>();
87       parent.awaitedResponses.insert(callId, AwaitedResponse { context, kj::mv(paf.fulfiller) });
88       auto promise = writePromise.then([p = kj::mv(paf.promise)]() mutable { return kj::mv(p); });
89       auto& parentRef = parent;
90       return promise.attach(kj::defer([&parentRef,callId]() {
91         parentRef.awaitedResponses.erase(callId);
92       }));
93     }
94   }
95 
96 private:
97   JsonRpc& parent;
98 };
99 
JsonRpc(Transport & transport,DynamicCapability::Client interface)100 JsonRpc::JsonRpc(Transport& transport, DynamicCapability::Client interface)
101     : JsonRpc(transport, kj::mv(interface), kj::newPromiseAndFulfiller<void>()) {}
JsonRpc(Transport & transport,DynamicCapability::Client interfaceParam,kj::PromiseFulfillerPair<void> paf)102 JsonRpc::JsonRpc(Transport& transport, DynamicCapability::Client interfaceParam,
103                  kj::PromiseFulfillerPair<void> paf)
104     : transport(transport),
105       interface(kj::mv(interfaceParam)),
106       errorPromise(paf.promise.fork()),
107       errorFulfiller(kj::mv(paf.fulfiller)),
108       readTask(readLoop().eagerlyEvaluate([this](kj::Exception&& e) {
109         errorFulfiller->reject(kj::mv(e));
110       })),
111       tasks(*this) {
112   codec.handleByAnnotation(interface.getSchema());
113   codec.handleByAnnotation<json::RpcMessage>();
114 
115   for (auto method: interface.getSchema().getMethods()) {
116     auto proto = method.getProto();
117     kj::StringPtr name = proto.getName();
118     for (auto annotation: proto.getAnnotations()) {
119       switch (annotation.getId()) {
120         case JSON_NAME_ANNOTATION_ID:
121           name = annotation.getValue().getText();
122           break;
123       }
124     }
125     methodMap.insert(name, method);
126   }
127 }
128 
getPeer(InterfaceSchema schema)129 DynamicCapability::Client JsonRpc::getPeer(InterfaceSchema schema) {
130   codec.handleByAnnotation(interface.getSchema());
131   return kj::heap<CapabilityImpl>(*this, schema);
132 }
133 
staticHeaderTable()134 static kj::HttpHeaderTable& staticHeaderTable() {
135   static kj::HttpHeaderTable HEADER_TABLE;
136   return HEADER_TABLE;
137 }
138 
queueWrite(kj::String text)139 kj::Promise<void> JsonRpc::queueWrite(kj::String text) {
140   auto fork = writeQueue.then([this, text = kj::mv(text)]() mutable {
141     auto promise = transport.send(text);
142     return promise.attach(kj::mv(text));
143   }).eagerlyEvaluate([this](kj::Exception&& e) {
144     errorFulfiller->reject(kj::mv(e));
145   }).fork();
146   writeQueue = fork.addBranch();
147   return fork.addBranch();
148 }
149 
queueError(kj::Maybe<json::Value::Reader> id,int code,kj::StringPtr message)150 void JsonRpc::queueError(kj::Maybe<json::Value::Reader> id, int code, kj::StringPtr message) {
151   MallocMessageBuilder capnpMessage;
152   auto jsonResponse = capnpMessage.getRoot<json::RpcMessage>();
153   jsonResponse.setJsonrpc("2.0");
154   KJ_IF_MAYBE(i, id) {
155     jsonResponse.setId(*i);
156   } else {
157     jsonResponse.initId().setNull();
158   }
159   auto error = jsonResponse.initError();
160   error.setCode(code);
161   error.setMessage(message);
162 
163   // OK to discard result of queueWrite() since it's just one branch of a fork.
164   queueWrite(codec.encode(jsonResponse));
165 }
166 
readLoop()167 kj::Promise<void> JsonRpc::readLoop() {
168   return transport.receive().then([this](kj::String message) -> kj::Promise<void> {
169     MallocMessageBuilder capnpMessage;
170     auto rpcMessageBuilder = capnpMessage.getRoot<json::RpcMessage>();
171 
172     KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
173       codec.decode(message, rpcMessageBuilder);
174     })) {
175       queueError(nullptr, -32700, kj::str("Parse error: ", exception->getDescription()));
176       return readLoop();
177     }
178 
179     KJ_CONTEXT("decoding JSON-RPC message", message);
180 
181     auto rpcMessage = rpcMessageBuilder.asReader();
182 
183     if (!rpcMessage.hasJsonrpc()) {
184       queueError(nullptr, -32700, kj::str("Missing 'jsonrpc' field."));
185       return readLoop();
186     } else if (rpcMessage.getJsonrpc() != "2.0") {
187       queueError(nullptr, -32700,
188           kj::str("Unknown JSON-RPC version. This peer implements version '2.0'."));
189       return readLoop();
190     }
191 
192     switch (rpcMessage.which()) {
193       case json::RpcMessage::NONE:
194         queueError(nullptr, -32700, kj::str("message has none of params, result, or error"));
195         break;
196 
197       case json::RpcMessage::PARAMS: {
198         // a call
199         KJ_IF_MAYBE(method, methodMap.find(rpcMessage.getMethod())) {
200           auto req = interface.newRequest(*method);
201           KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
202             codec.decode(rpcMessage.getParams(), req);
203           })) {
204             kj::Maybe<JsonValue::Reader> id;
205             if (rpcMessage.hasId()) id = rpcMessage.getId();
206             queueError(id, -32602,
207                 kj::str("Type error in method params: ", exception->getDescription()));
208             break;
209           }
210 
211           if (rpcMessage.hasId()) {
212             auto id = rpcMessage.getId();
213             auto idCopy = kj::heapArray<word>(id.totalSize().wordCount + 1);
214             memset(idCopy.begin(), 0, idCopy.asBytes().size());
215             copyToUnchecked(id, idCopy);
216             auto idPtr = readMessageUnchecked<json::Value>(idCopy.begin());
217 
218             auto promise = req.send()
219                 .then([this,idPtr](Response<DynamicStruct> response) mutable {
220               MallocMessageBuilder capnpMessage;
221               auto jsonResponse = capnpMessage.getRoot<json::RpcMessage>();
222               jsonResponse.setJsonrpc("2.0");
223               jsonResponse.setId(idPtr);
224               codec.encode(DynamicStruct::Reader(response), jsonResponse.initResult());
225               return queueWrite(codec.encode(jsonResponse));
226             }, [this,idPtr](kj::Exception&& e) {
227               MallocMessageBuilder capnpMessage;
228               auto jsonResponse = capnpMessage.getRoot<json::RpcMessage>();
229               jsonResponse.setJsonrpc("2.0");
230               jsonResponse.setId(idPtr);
231               auto error = jsonResponse.initError();
232               switch (e.getType()) {
233                 case kj::Exception::Type::FAILED:
234                   error.setCode(-32000);
235                   break;
236                 case kj::Exception::Type::DISCONNECTED:
237                   error.setCode(-32001);
238                   break;
239                 case kj::Exception::Type::OVERLOADED:
240                   error.setCode(-32002);
241                   break;
242                 case kj::Exception::Type::UNIMPLEMENTED:
243                   error.setCode(-32601);  // method not found
244                   break;
245               }
246               error.setMessage(e.getDescription());
247               return queueWrite(codec.encode(jsonResponse));
248             });
249             tasks.add(promise.attach(kj::mv(idCopy)));
250           } else {
251             // No 'id', so this is a notification.
252             tasks.add(req.send().ignoreResult().catch_([](kj::Exception&& exception) {
253               if (exception.getType() != kj::Exception::Type::UNIMPLEMENTED) {
254                 KJ_LOG(ERROR, "JSON-RPC notification threw exception into the abyss", exception);
255               }
256             }));
257           }
258         } else {
259           if (rpcMessage.hasId()) {
260             queueError(rpcMessage.getId(), -32601, "Method not found");
261           } else {
262             // Ignore notification for unknown method.
263           }
264         }
265         break;
266       }
267 
268       case json::RpcMessage::RESULT: {
269         auto id = rpcMessage.getId();
270         if (!id.isNumber()) {
271           // JSON-RPC doesn't define what to do if receiving a response with an invalid id.
272           KJ_LOG(ERROR, "JSON-RPC response has invalid ID");
273         } else KJ_IF_MAYBE(awaited, awaitedResponses.find((uint)id.getNumber())) {
274           KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
275             codec.decode(rpcMessage.getResult(), awaited->context.getResults());
276             awaited->fulfiller->fulfill();
277           })) {
278             // Errors always propagate from callee to caller, so we don't want to throw this error
279             // back to the server.
280             awaited->fulfiller->reject(kj::mv(*exception));
281           }
282         } else {
283           // Probably, this is the response to a call that was canceled.
284         }
285         break;
286       }
287 
288       case json::RpcMessage::ERROR: {
289         auto id = rpcMessage.getId();
290         if (id.isNull()) {
291           // Error message will be logged by KJ_CONTEXT, above.
292           KJ_LOG(ERROR, "peer reports JSON-RPC protocol error");
293         } else if (!id.isNumber()) {
294           // JSON-RPC doesn't define what to do if receiving a response with an invalid id.
295           KJ_LOG(ERROR, "JSON-RPC response has invalid ID");
296         } else KJ_IF_MAYBE(awaited, awaitedResponses.find((uint)id.getNumber())) {
297           auto error = rpcMessage.getError();
298           auto code = error.getCode();
299           kj::Exception::Type type =
300               code == -32601 ? kj::Exception::Type::UNIMPLEMENTED
301                              : kj::Exception::Type::FAILED;
302           awaited->fulfiller->reject(kj::Exception(
303               type, __FILE__, __LINE__, kj::str(error.getMessage())));
304         } else {
305           // Probably, this is the response to a call that was canceled.
306         }
307         break;
308       }
309     }
310 
311     return readLoop();
312   });
313 }
314 
taskFailed(kj::Exception && exception)315 void JsonRpc::taskFailed(kj::Exception&& exception) {
316   errorFulfiller->reject(kj::mv(exception));
317 }
318 
319 // =======================================================================================
320 
ContentLengthTransport(kj::AsyncIoStream & stream)321 JsonRpc::ContentLengthTransport::ContentLengthTransport(kj::AsyncIoStream& stream)
322     : stream(stream), input(kj::newHttpInputStream(stream, staticHeaderTable())) {}
~ContentLengthTransport()323 JsonRpc::ContentLengthTransport::~ContentLengthTransport() noexcept(false) {}
324 
send(kj::StringPtr text)325 kj::Promise<void> JsonRpc::ContentLengthTransport::send(kj::StringPtr text) {
326   auto headers = kj::str("Content-Length: ", text.size(), "\r\n\r\n");
327   parts[0] = headers.asBytes();
328   parts[1] = text.asBytes();
329   return stream.write(parts).attach(kj::mv(headers));
330 }
331 
receive()332 kj::Promise<kj::String> JsonRpc::ContentLengthTransport::receive() {
333   return input->readMessage()
334       .then([](kj::HttpInputStream::Message&& message) {
335     auto promise = message.body->readAllText();
336     return promise.attach(kj::mv(message.body));
337   });
338 }
339 
340 }  // namespace capnp
341