1 // Copyright (c) 2018 Kenton Varda and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21
22 #include "json-rpc.h"
23 #include <kj/compat/http.h>
24 #include <capnp/compat/json-rpc.capnp.h>
25
26 namespace capnp {
27
28 static constexpr uint64_t JSON_NAME_ANNOTATION_ID = 0xfa5b1fd61c2e7c3dull;
29 static constexpr uint64_t JSON_NOTIFICATION_ANNOTATION_ID = 0xa0a054dea32fd98cull;
30
31 class JsonRpc::CapabilityImpl final: public DynamicCapability::Server {
32 public:
CapabilityImpl(JsonRpc & parent,InterfaceSchema schema)33 CapabilityImpl(JsonRpc& parent, InterfaceSchema schema)
34 : DynamicCapability::Server(schema), parent(parent) {}
35
call(InterfaceSchema::Method method,CallContext<DynamicStruct,DynamicStruct> context)36 kj::Promise<void> call(InterfaceSchema::Method method,
37 CallContext<DynamicStruct, DynamicStruct> context) override {
38 auto proto = method.getProto();
39 bool isNotification = false;
40 kj::StringPtr name = proto.getName();
41 for (auto annotation: proto.getAnnotations()) {
42 switch (annotation.getId()) {
43 case JSON_NAME_ANNOTATION_ID:
44 name = annotation.getValue().getText();
45 break;
46 case JSON_NOTIFICATION_ANNOTATION_ID:
47 isNotification = true;
48 break;
49 }
50 }
51
52 capnp::MallocMessageBuilder message;
53 auto value = message.getRoot<json::Value>();
54 auto list = value.initObject(3 + !isNotification);
55
56 uint index = 0;
57
58 auto jsonrpc = list[index++];
59 jsonrpc.setName("jsonrpc");
60 jsonrpc.initValue().setString("2.0");
61
62 uint callId = parent.callCount++;
63
64 if (!isNotification) {
65 auto id = list[index++];
66 id.setName("id");
67 id.initValue().setNumber(callId);
68 }
69
70 auto methodName = list[index++];
71 methodName.setName("method");
72 methodName.initValue().setString(name);
73
74 auto params = list[index++];
75 params.setName("params");
76 parent.codec.encode(context.getParams(), params.initValue());
77
78 auto writePromise = parent.queueWrite(parent.codec.encode(value));
79
80 if (isNotification) {
81 auto sproto = context.getResultsType().getProto().getStruct();
82 MessageSize size { sproto.getDataWordCount(), sproto.getPointerCount() };
83 context.initResults(size);
84 return kj::mv(writePromise);
85 } else {
86 auto paf = kj::newPromiseAndFulfiller<void>();
87 parent.awaitedResponses.insert(callId, AwaitedResponse { context, kj::mv(paf.fulfiller) });
88 auto promise = writePromise.then([p = kj::mv(paf.promise)]() mutable { return kj::mv(p); });
89 auto& parentRef = parent;
90 return promise.attach(kj::defer([&parentRef,callId]() {
91 parentRef.awaitedResponses.erase(callId);
92 }));
93 }
94 }
95
96 private:
97 JsonRpc& parent;
98 };
99
JsonRpc(Transport & transport,DynamicCapability::Client interface)100 JsonRpc::JsonRpc(Transport& transport, DynamicCapability::Client interface)
101 : JsonRpc(transport, kj::mv(interface), kj::newPromiseAndFulfiller<void>()) {}
JsonRpc(Transport & transport,DynamicCapability::Client interfaceParam,kj::PromiseFulfillerPair<void> paf)102 JsonRpc::JsonRpc(Transport& transport, DynamicCapability::Client interfaceParam,
103 kj::PromiseFulfillerPair<void> paf)
104 : transport(transport),
105 interface(kj::mv(interfaceParam)),
106 errorPromise(paf.promise.fork()),
107 errorFulfiller(kj::mv(paf.fulfiller)),
108 readTask(readLoop().eagerlyEvaluate([this](kj::Exception&& e) {
109 errorFulfiller->reject(kj::mv(e));
110 })),
111 tasks(*this) {
112 codec.handleByAnnotation(interface.getSchema());
113 codec.handleByAnnotation<json::RpcMessage>();
114
115 for (auto method: interface.getSchema().getMethods()) {
116 auto proto = method.getProto();
117 kj::StringPtr name = proto.getName();
118 for (auto annotation: proto.getAnnotations()) {
119 switch (annotation.getId()) {
120 case JSON_NAME_ANNOTATION_ID:
121 name = annotation.getValue().getText();
122 break;
123 }
124 }
125 methodMap.insert(name, method);
126 }
127 }
128
getPeer(InterfaceSchema schema)129 DynamicCapability::Client JsonRpc::getPeer(InterfaceSchema schema) {
130 codec.handleByAnnotation(interface.getSchema());
131 return kj::heap<CapabilityImpl>(*this, schema);
132 }
133
staticHeaderTable()134 static kj::HttpHeaderTable& staticHeaderTable() {
135 static kj::HttpHeaderTable HEADER_TABLE;
136 return HEADER_TABLE;
137 }
138
queueWrite(kj::String text)139 kj::Promise<void> JsonRpc::queueWrite(kj::String text) {
140 auto fork = writeQueue.then([this, text = kj::mv(text)]() mutable {
141 auto promise = transport.send(text);
142 return promise.attach(kj::mv(text));
143 }).eagerlyEvaluate([this](kj::Exception&& e) {
144 errorFulfiller->reject(kj::mv(e));
145 }).fork();
146 writeQueue = fork.addBranch();
147 return fork.addBranch();
148 }
149
queueError(kj::Maybe<json::Value::Reader> id,int code,kj::StringPtr message)150 void JsonRpc::queueError(kj::Maybe<json::Value::Reader> id, int code, kj::StringPtr message) {
151 MallocMessageBuilder capnpMessage;
152 auto jsonResponse = capnpMessage.getRoot<json::RpcMessage>();
153 jsonResponse.setJsonrpc("2.0");
154 KJ_IF_MAYBE(i, id) {
155 jsonResponse.setId(*i);
156 } else {
157 jsonResponse.initId().setNull();
158 }
159 auto error = jsonResponse.initError();
160 error.setCode(code);
161 error.setMessage(message);
162
163 // OK to discard result of queueWrite() since it's just one branch of a fork.
164 queueWrite(codec.encode(jsonResponse));
165 }
166
readLoop()167 kj::Promise<void> JsonRpc::readLoop() {
168 return transport.receive().then([this](kj::String message) -> kj::Promise<void> {
169 MallocMessageBuilder capnpMessage;
170 auto rpcMessageBuilder = capnpMessage.getRoot<json::RpcMessage>();
171
172 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
173 codec.decode(message, rpcMessageBuilder);
174 })) {
175 queueError(nullptr, -32700, kj::str("Parse error: ", exception->getDescription()));
176 return readLoop();
177 }
178
179 KJ_CONTEXT("decoding JSON-RPC message", message);
180
181 auto rpcMessage = rpcMessageBuilder.asReader();
182
183 if (!rpcMessage.hasJsonrpc()) {
184 queueError(nullptr, -32700, kj::str("Missing 'jsonrpc' field."));
185 return readLoop();
186 } else if (rpcMessage.getJsonrpc() != "2.0") {
187 queueError(nullptr, -32700,
188 kj::str("Unknown JSON-RPC version. This peer implements version '2.0'."));
189 return readLoop();
190 }
191
192 switch (rpcMessage.which()) {
193 case json::RpcMessage::NONE:
194 queueError(nullptr, -32700, kj::str("message has none of params, result, or error"));
195 break;
196
197 case json::RpcMessage::PARAMS: {
198 // a call
199 KJ_IF_MAYBE(method, methodMap.find(rpcMessage.getMethod())) {
200 auto req = interface.newRequest(*method);
201 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
202 codec.decode(rpcMessage.getParams(), req);
203 })) {
204 kj::Maybe<JsonValue::Reader> id;
205 if (rpcMessage.hasId()) id = rpcMessage.getId();
206 queueError(id, -32602,
207 kj::str("Type error in method params: ", exception->getDescription()));
208 break;
209 }
210
211 if (rpcMessage.hasId()) {
212 auto id = rpcMessage.getId();
213 auto idCopy = kj::heapArray<word>(id.totalSize().wordCount + 1);
214 memset(idCopy.begin(), 0, idCopy.asBytes().size());
215 copyToUnchecked(id, idCopy);
216 auto idPtr = readMessageUnchecked<json::Value>(idCopy.begin());
217
218 auto promise = req.send()
219 .then([this,idPtr](Response<DynamicStruct> response) mutable {
220 MallocMessageBuilder capnpMessage;
221 auto jsonResponse = capnpMessage.getRoot<json::RpcMessage>();
222 jsonResponse.setJsonrpc("2.0");
223 jsonResponse.setId(idPtr);
224 codec.encode(DynamicStruct::Reader(response), jsonResponse.initResult());
225 return queueWrite(codec.encode(jsonResponse));
226 }, [this,idPtr](kj::Exception&& e) {
227 MallocMessageBuilder capnpMessage;
228 auto jsonResponse = capnpMessage.getRoot<json::RpcMessage>();
229 jsonResponse.setJsonrpc("2.0");
230 jsonResponse.setId(idPtr);
231 auto error = jsonResponse.initError();
232 switch (e.getType()) {
233 case kj::Exception::Type::FAILED:
234 error.setCode(-32000);
235 break;
236 case kj::Exception::Type::DISCONNECTED:
237 error.setCode(-32001);
238 break;
239 case kj::Exception::Type::OVERLOADED:
240 error.setCode(-32002);
241 break;
242 case kj::Exception::Type::UNIMPLEMENTED:
243 error.setCode(-32601); // method not found
244 break;
245 }
246 error.setMessage(e.getDescription());
247 return queueWrite(codec.encode(jsonResponse));
248 });
249 tasks.add(promise.attach(kj::mv(idCopy)));
250 } else {
251 // No 'id', so this is a notification.
252 tasks.add(req.send().ignoreResult().catch_([](kj::Exception&& exception) {
253 if (exception.getType() != kj::Exception::Type::UNIMPLEMENTED) {
254 KJ_LOG(ERROR, "JSON-RPC notification threw exception into the abyss", exception);
255 }
256 }));
257 }
258 } else {
259 if (rpcMessage.hasId()) {
260 queueError(rpcMessage.getId(), -32601, "Method not found");
261 } else {
262 // Ignore notification for unknown method.
263 }
264 }
265 break;
266 }
267
268 case json::RpcMessage::RESULT: {
269 auto id = rpcMessage.getId();
270 if (!id.isNumber()) {
271 // JSON-RPC doesn't define what to do if receiving a response with an invalid id.
272 KJ_LOG(ERROR, "JSON-RPC response has invalid ID");
273 } else KJ_IF_MAYBE(awaited, awaitedResponses.find((uint)id.getNumber())) {
274 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
275 codec.decode(rpcMessage.getResult(), awaited->context.getResults());
276 awaited->fulfiller->fulfill();
277 })) {
278 // Errors always propagate from callee to caller, so we don't want to throw this error
279 // back to the server.
280 awaited->fulfiller->reject(kj::mv(*exception));
281 }
282 } else {
283 // Probably, this is the response to a call that was canceled.
284 }
285 break;
286 }
287
288 case json::RpcMessage::ERROR: {
289 auto id = rpcMessage.getId();
290 if (id.isNull()) {
291 // Error message will be logged by KJ_CONTEXT, above.
292 KJ_LOG(ERROR, "peer reports JSON-RPC protocol error");
293 } else if (!id.isNumber()) {
294 // JSON-RPC doesn't define what to do if receiving a response with an invalid id.
295 KJ_LOG(ERROR, "JSON-RPC response has invalid ID");
296 } else KJ_IF_MAYBE(awaited, awaitedResponses.find((uint)id.getNumber())) {
297 auto error = rpcMessage.getError();
298 auto code = error.getCode();
299 kj::Exception::Type type =
300 code == -32601 ? kj::Exception::Type::UNIMPLEMENTED
301 : kj::Exception::Type::FAILED;
302 awaited->fulfiller->reject(kj::Exception(
303 type, __FILE__, __LINE__, kj::str(error.getMessage())));
304 } else {
305 // Probably, this is the response to a call that was canceled.
306 }
307 break;
308 }
309 }
310
311 return readLoop();
312 });
313 }
314
taskFailed(kj::Exception && exception)315 void JsonRpc::taskFailed(kj::Exception&& exception) {
316 errorFulfiller->reject(kj::mv(exception));
317 }
318
319 // =======================================================================================
320
ContentLengthTransport(kj::AsyncIoStream & stream)321 JsonRpc::ContentLengthTransport::ContentLengthTransport(kj::AsyncIoStream& stream)
322 : stream(stream), input(kj::newHttpInputStream(stream, staticHeaderTable())) {}
~ContentLengthTransport()323 JsonRpc::ContentLengthTransport::~ContentLengthTransport() noexcept(false) {}
324
send(kj::StringPtr text)325 kj::Promise<void> JsonRpc::ContentLengthTransport::send(kj::StringPtr text) {
326 auto headers = kj::str("Content-Length: ", text.size(), "\r\n\r\n");
327 parts[0] = headers.asBytes();
328 parts[1] = text.asBytes();
329 return stream.write(parts).attach(kj::mv(headers));
330 }
331
receive()332 kj::Promise<kj::String> JsonRpc::ContentLengthTransport::receive() {
333 return input->readMessage()
334 .then([](kj::HttpInputStream::Message&& message) {
335 auto promise = message.body->readAllText();
336 return promise.attach(kj::mv(message.body));
337 });
338 }
339
340 } // namespace capnp
341