1 /*
2 * This file is part of yacas.
3 * Yacas is free software: you can redistribute it and/or modify
4 * it under the terms of the GNU Lesset General Public License as
5 * published by the Free Software Foundation, either version 2.1
6 * of the License, or (at your option) any later version.
7 *
8 * Yacas is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with yacas. If not, see <http://www.gnu.org/licenses/>.
15 *
16 */
17
18 /*
19 * File: yacas_kernel.cpp
20 * Author: mazur
21 *
22 * Created on November 6, 2015, 3:10 PM
23 */
24
25 #include "yacas_kernel.hpp"
26 #include "base64.hpp"
27
28 #include "yacas/yacas_version.h"
29
30 #include <boost/date_time/posix_time/posix_time.hpp>
31 #include <boost/uuid/uuid_io.hpp>
32
33 #include <fstream>
34 #include <iostream>
35 #include <regex>
36 #include <set>
37 #include <string>
38
39 namespace {
now()40 std::string now()
41 {
42 using namespace boost::posix_time;
43
44 return to_iso_extended_string(microsec_clock::local_time());
45 }
46 }
47
YacasKernel(const std::string & scripts_path,const Json::Value & config)48 YacasKernel::YacasKernel(const std::string& scripts_path,
49 const Json::Value& config) :
50 _session(config["key"].asString()),
51 _hb_socket(_ctx, zmqpp::socket_type::reply),
52 _iopub_socket(_ctx, zmqpp::socket_type::publish),
53 _control_socket(_ctx, zmqpp::socket_type::router),
54 _stdin_socket(_ctx, zmqpp::socket_type::router),
55 _shell_socket(_ctx, zmqpp::socket_type::router),
56 _engine_socket(_ctx, zmqpp::socket_type::pair),
57 _execution_count(1),
58 _engine(scripts_path, _ctx, "inproc://engine"),
59 _tex_output(true),
60 _yacas(_side_effects),
61 _shutdown(false)
62 {
63 const std::string transport = config["transport"].asString();
64 const std::string ip = config["ip"].asString();
65
66 _hb_socket.bind(transport + "://" + ip + ":" +
67 config["hb_port"].asString());
68 _iopub_socket.bind(transport + "://" + ip + ":" +
69 config["iopub_port"].asString());
70 _control_socket.bind(transport + "://" + ip + ":" +
71 config["control_port"].asString());
72 _stdin_socket.bind(transport + "://" + ip + ":" +
73 config["stdin_port"].asString());
74 _shell_socket.bind(transport + "://" + ip + ":" +
75 config["shell_port"].asString());
76 _engine_socket.bind("inproc://engine");
77
78 _yacas.Evaluate(std::string("DefaultDirectory(\"") + scripts_path +
79 std::string("\");"));
80 _yacas.Evaluate("Load(\"yacasinit.ys\");");
81 }
82
run()83 void YacasKernel::run()
84 {
85 zmqpp::poller poller;
86
87 poller.add(_hb_socket);
88 poller.add(_control_socket);
89 poller.add(_stdin_socket);
90 poller.add(_shell_socket);
91 poller.add(_iopub_socket);
92 poller.add(_engine_socket);
93
94 for (;;) {
95 poller.poll();
96
97 if (poller.has_input(_hb_socket)) {
98 zmqpp::message msg;
99 _hb_socket.receive(msg);
100 _hb_socket.send(msg);
101 }
102
103 if (poller.has_input(_shell_socket)) {
104 zmqpp::message msg;
105 _shell_socket.receive(msg);
106 _handle_shell(std::make_shared<Request>(_session, msg));
107 }
108
109 if (poller.has_input(_control_socket)) {
110 zmqpp::message msg;
111 _control_socket.receive(msg);
112 Request request(_session, msg);
113
114 if (request.header()["msg_type"].asString() == "shutdown_request")
115 _shutdown = true;
116 }
117
118 if (_shutdown)
119 return;
120
121 if (poller.has_input(_stdin_socket)) {
122 zmqpp::message msg;
123 _stdin_socket.receive(msg);
124 }
125
126 if (poller.has_input(_engine_socket)) {
127 zmqpp::message msg;
128 _engine_socket.receive(msg);
129 _handle_engine(msg);
130 }
131 }
132 }
133
Session(const std::string & key)134 YacasKernel::Session::Session(const std::string& key) :
135 _auth(key),
136 _uuid(_uuid_gen())
137 {
138 }
139
Request(const Session & session,const zmqpp::message & msg)140 YacasKernel::Request::Request(const Session& session,
141 const zmqpp::message& msg) :
142 _session(session)
143 {
144 std::string header_buf;
145 msg.get(header_buf, 3);
146 std::string parent_header_buf;
147 msg.get(parent_header_buf, 4);
148 std::string metadata_buf;
149 msg.get(metadata_buf, 5);
150 std::string content_buf;
151 msg.get(content_buf, 6);
152
153 HMAC_SHA256 auth(_session.auth());
154
155 auth.update(header_buf);
156 auth.update(parent_header_buf);
157 auth.update(metadata_buf);
158 auth.update(content_buf);
159
160 std::string signature_buf;
161 msg.get(signature_buf, 2);
162
163 if (auth.hexdigest() != signature_buf)
164 throw std::runtime_error("invalid signature");
165
166 msg.get(_identities_buf, 0);
167
168 Json::Reader reader;
169
170 reader.parse(header_buf, _header);
171 reader.parse(content_buf, _content);
172 reader.parse(metadata_buf, _metadata);
173 }
174
reply(zmqpp::socket & socket,const std::string & msg_type,const Json::Value & content) const175 void YacasKernel::Request::reply(zmqpp::socket& socket,
176 const std::string& msg_type,
177 const Json::Value& content) const
178 {
179 Json::Value header;
180 header["username"] = "kernel";
181 header["version"] = "5.0";
182 header["session"] = boost::uuids::to_string(_session.uuid());
183 header["date"] = now();
184 header["msg_id"] = boost::uuids::to_string(_session.generate_msg_uuid());
185 header["msg_type"] = msg_type;
186
187 Json::StreamWriterBuilder builder;
188
189 const std::string content_buf = Json::writeString(builder, content);
190 // FIXME:
191 const std::string metadata_buf = "{}";
192 const std::string header_buf = Json::writeString(builder, header);
193 const std::string parent_header_buf = Json::writeString(builder, _header);
194
195 HMAC_SHA256 auth(_session.auth());
196
197 auth.update(header_buf);
198 auth.update(parent_header_buf);
199 auth.update(metadata_buf);
200 auth.update(content_buf);
201
202 zmqpp::message msg;
203 msg.add(_identities_buf);
204 msg.add("<IDS|MSG>");
205 msg.add(auth.hexdigest());
206 msg.add(header_buf);
207 msg.add(parent_header_buf);
208 msg.add(metadata_buf);
209 msg.add(content_buf);
210
211 socket.send(msg);
212 }
213
_handle_shell(const std::shared_ptr<Request> & request)214 void YacasKernel::_handle_shell(const std::shared_ptr<Request>& request)
215 {
216 const std::string msg_type = request->header()["msg_type"].asString();
217
218 if (msg_type == "kernel_info_request") {
219 Json::Value language_info;
220 language_info["name"] = "yacas";
221 language_info["version"] = YACAS_VERSION;
222 language_info["mimetype"] = "text/x-yacas";
223 language_info["file_extension"] = ".ys";
224
225 Json::Value homepage;
226 homepage["text"] = "Yacas Homepage";
227 homepage["url"] = "http://www.yacas.org";
228
229 Json::Value docs;
230 docs["text"] = "Yacas Documentation";
231 docs["url"] = "http://yacas.readthedocs.org";
232
233 Json::Value help_links;
234 help_links.append(homepage);
235 help_links.append(docs);
236
237 Json::Value reply_content;
238 reply_content["protocol_version"] = "5.2";
239 reply_content["implementation"] = "yacas_kernel";
240 reply_content["implementation_version"] = "0.2";
241 reply_content["language_info"] = language_info;
242 reply_content["banner"] = "yacas_kernel " YACAS_VERSION;
243 reply_content["help_links"] = help_links;
244
245 request->reply(_shell_socket, "kernel_info_reply", reply_content);
246 } else if (msg_type == "execute_request") {
247
248 _execute_requests.insert(
249 std::make_pair(_execution_count, std::move(request)));
250 _engine.submit(_execution_count, request->content()["code"].asString());
251
252 _execution_count += 1;
253 } else if (msg_type == "complete_request") {
254 std::string code = request->content()["code"].asString();
255 int cursor = request->content()["cursor_pos"].asInt();
256 int start = cursor;
257 while (start > 0 && std::isalpha(code[start - 1]))
258 start -= 1;
259 const std::string prefix = code.substr(start, cursor - start);
260
261 std::set<std::string> matches;
262
263 for (auto op : _yacas.getDefEnv().getEnv().PreFix())
264 if (op.first->compare(0, prefix.length(), prefix) == 0)
265 matches.insert(*op.first);
266
267 for (auto op : _yacas.getDefEnv().getEnv().InFix())
268 if (op.first->compare(0, prefix.length(), prefix) == 0)
269 matches.insert(*op.first);
270
271 for (auto op : _yacas.getDefEnv().getEnv().PostFix())
272 if (op.first->compare(0, prefix.length(), prefix) == 0)
273 matches.insert(*op.first);
274
275 for (auto op : _yacas.getDefEnv().getEnv().Bodied())
276 if (op.first->compare(0, prefix.length(), prefix) == 0)
277 matches.insert(*op.first);
278
279 for (auto op : _yacas.getDefEnv().getEnv().CoreCommands())
280 if (op.first->compare(0, prefix.length(), prefix) == 0)
281 matches.insert(*op.first);
282
283 for (auto op : _yacas.getDefEnv().getEnv().UserFunctions())
284 if (op.first->compare(0, prefix.length(), prefix) == 0)
285 matches.insert(*op.first);
286
287 Json::Value reply_content_matches;
288 for (const std::string& match : matches)
289 reply_content_matches.append(match);
290
291 Json::Value reply_content;
292 reply_content["status"] = "ok";
293 reply_content["cursor_start"] = start;
294 reply_content["cursor_end"] = cursor;
295 reply_content["matches"] = reply_content_matches;
296
297 request->reply(_shell_socket, "complete_reply", reply_content);
298
299 } else if (msg_type == "shutdown_request") {
300
301 _shutdown = true;
302
303 Json::Value reply_content;
304 reply_content["status"] = "ok";
305
306 request->reply(_shell_socket, "shutdown_reply", reply_content);
307 }
308 }
309
_handle_engine(const zmqpp::message & msg)310 void YacasKernel::_handle_engine(const zmqpp::message& msg)
311 {
312 std::string msg_type;
313 msg.get(msg_type, 0);
314
315 std::string content_buf;
316 msg.get(content_buf, 1);
317
318 Json::Value content;
319 Json::Reader().parse(content_buf, content);
320
321 std::shared_ptr<YacasKernel::Request> request =
322 _execute_requests[content["id"].asUInt64()];
323
324 bool condemned = false;
325
326 if (msg_type == "calculate") {
327 Json::Value status_content;
328 status_content["execution_state"] = "busy";
329
330 request->reply(_iopub_socket, "status", status_content);
331
332 Json::Value execute_input_content;
333 execute_input_content["execution_count"] = request->content()["id"];
334 execute_input_content["code"] = request->content()["expr"];
335
336 request->reply(_iopub_socket, "execute_input", execute_input_content);
337 } else if (msg_type == "result") {
338
339 if (content.isMember("side_effects")) {
340 Json::Value stream_content;
341 stream_content["name"] = "stdout";
342 stream_content["text"] = content["side_effects"];
343
344 request->reply(_iopub_socket, "stream", stream_content);
345 }
346
347 if (content.isMember("error")) {
348 Json::Value reply_content;
349 reply_content["status"] = "error";
350 reply_content["execution_count"] = content["id"];
351 reply_content["ename"] = Json::Value();
352 reply_content["evalue"] = Json::Value();
353 reply_content["traceback"].append(content["error"]);
354
355 request->reply(_shell_socket, "execute_reply", reply_content);
356
357 Json::Value error_content;
358 error_content["execution_count"] = content["id"];
359 error_content["ename"] = Json::Value();
360 error_content["evalue"] = Json::Value();
361 error_content["traceback"].append(content["error"]);
362
363 request->reply(_iopub_socket, "error", error_content);
364 } else {
365 std::string text_result = content["result"].asString();
366 if (text_result.back() == ';')
367 text_result.pop_back();
368
369 Json::Value content_data;
370 content_data["text/plain"] = text_result;
371
372 std::regex rx("File\\(\"([^\"]+)\", *\"([^\"]+)\"\\)",
373 std::regex_constants::ECMAScript);
374 std::smatch m;
375 if (std::regex_match(text_result, m, rx)) {
376 std::ifstream f(m[1],
377 std::ios_base::in | std::ios_base::binary);
378 const std::vector<unsigned char> img(
379 (std::istreambuf_iterator<char>(f)),
380 std::istreambuf_iterator<char>());
381 content_data[m[2]] = base64_encode(img);
382 } else {
383 if (_tex_output) {
384 _side_effects.clear();
385 _side_effects.str("");
386
387 _yacas.Evaluate(std::string("TeXForm(Hold(") + text_result +
388 "));");
389
390 std::string tex_result = _yacas.Result();
391 tex_result = tex_result.substr(1, tex_result.size() - 3);
392
393 content_data["text/latex"] = tex_result;
394 }
395 }
396
397 Json::Value reply_content;
398 reply_content["status"] = "ok";
399 reply_content["execution_count"] = content["id"];
400 reply_content["data"] = content_data;
401
402 request->reply(_shell_socket, "execute_result", reply_content);
403
404 Json::Value result_content;
405 result_content["execution_count"] = content["id"];
406 result_content["data"] = content_data;
407 result_content["metadata"] = "{}";
408
409 request->reply(_iopub_socket, "execute_result", result_content);
410
411 condemned = true;
412 }
413
414 Json::Value status_content;
415 status_content["execution_state"] = "idle";
416
417 request->reply(_iopub_socket, "status", status_content);
418
419 if (condemned)
420 _execute_requests.erase(content["id"].asUInt64());
421 }
422 }
423