1 // Copyright 2017-2019 VMware, Inc.
2 // SPDX-License-Identifier: BSD-2-Clause
3 //
4 // The BSD-2 license (the License) set forth below applies to all parts of the
5 // Cascade project. You may not use this file except in compliance with the
6 // License.
7 //
8 // BSD-2 License
9 //
10 // Redistribution and use in source and binary forms, with or without
11 // modification, are permitted provided that the following conditions are met:
12 //
13 // 1. Redistributions of source code must retain the above copyright notice, this
14 // list of conditions and the following disclaimer.
15 //
16 // 2. Redistributions in binary form must reproduce the above copyright notice,
17 // this list of conditions and the following disclaimer in the documentation
18 // and/or other materials provided with the distribution.
19 //
20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS AS IS AND
21 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24 // FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25 // DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26 // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27 // CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 // OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
31 #include "target/compiler/remote_compiler.h"
32
33 #include <cassert>
34 #include <unordered_map>
35 #include "common/log.h"
36 #include "common/sockserver.h"
37 #include "common/sockstream.h"
38 #include "target/compiler/remote_interface.h"
39 #include "target/engine.h"
40 #include "target/state.h"
41 #include "verilog/ast/ast.h"
42 #include "verilog/parse/parser.h"
43
44 using namespace std;
45
46 namespace cascade {
47
RemoteCompiler()48 RemoteCompiler::RemoteCompiler() : Compiler(), Thread() {
49 set_path("/tmp/fpga_socket");
50 set_port(8800);
51
52 sock_ = nullptr;
53 }
54
~RemoteCompiler()55 RemoteCompiler::~RemoteCompiler() {
56 Compiler::stop_async();
57 }
58
schedule_state_safe_interrupt(Runtime::Interrupt int_)59 void RemoteCompiler::schedule_state_safe_interrupt(Runtime::Interrupt int_) {
60 lock_guard<mutex> lg(slock_);
61
62 // Send a state safe begin request to every registered compiler and wait
63 // for them to reply with a state safe okay
64 for (const auto& si : sock_index_) {
65 if (si.first == -1) {
66 continue;
67 }
68 auto* asock = socks_[si.first];
69 Rpc(Rpc::Type::STATE_SAFE_BEGIN).serialize(*asock);
70 asock->flush();
71 Rpc res;
72 res.deserialize(*asock);
73 assert(res.type_ == Rpc::Type::STATE_SAFE_OKAY);
74 }
75
76 // We now have every known instance of cascade either blocked in a state safe
77 // window or reporting that it's executed its finish statement.
78 int_();
79
80 // Send a state safe finish response to every known instance of cascade
81 for (const auto& si : sock_index_) {
82 if (si.first == -1) {
83 continue;
84 }
85 auto* asock = socks_[si.first];
86 Rpc(Rpc::Type::STATE_SAFE_FINISH).serialize(*asock);
87 asock->flush();
88 }
89 }
90
get_interface(const std::string & loc)91 Interface* RemoteCompiler::get_interface(const std::string& loc) {
92 if (loc != "remote") {
93 return nullptr;
94 }
95 if (sock_ == nullptr) {
96 return nullptr;
97 }
98 return new RemoteInterface(sock_);
99 }
100
set_path(const string & p)101 RemoteCompiler& RemoteCompiler::set_path(const string& p) {
102 path_ = p;
103 return *this;
104 }
105
set_port(uint32_t p)106 RemoteCompiler& RemoteCompiler::set_port(uint32_t p) {
107 port_ = p;
108 return *this;
109 }
110
run_logic()111 void RemoteCompiler::run_logic() {
112 sockserver tl(port_, 8);
113 sockserver ul(path_.c_str(), 8);
114 if (tl.error() || ul.error()) {
115 return;
116 }
117
118 fd_set master_set;
119 FD_ZERO(&master_set);
120 FD_SET(tl.descriptor(), &master_set);
121 FD_SET(ul.descriptor(), &master_set);
122
123 fd_set read_set;
124 FD_ZERO(&read_set);
125
126 struct timeval timeout = {1, 0};
127 auto max_fd = max(tl.descriptor(), ul.descriptor());
128
129 pool_.set_num_threads(4);
130 pool_.run();
131
132 while (!stop_requested()) {
133 read_set = master_set;
134 select(max_fd+1, &read_set, nullptr, nullptr, &timeout);
135 for (auto i = 0; i <= max_fd; ++i) {
136
137 // Not ready; nothing to do
138 if (!FD_ISSET(i, &read_set)) {
139 continue;
140 }
141
142 // Listener logic: New connections are added to the read set Note that
143 // this is a write critical section for sockets so it is guarded against
144 // race conditions with the state safe interrupt handler.
145 if ((i == tl.descriptor()) || (i == ul.descriptor())) {
146 lock_guard<mutex> lg(slock_);
147 auto* sock = (i == tl.descriptor()) ? tl.accept() : ul.accept();
148 const auto fd = sock->descriptor();
149 FD_SET(fd, &master_set);
150 if (fd > max_fd) {
151 max_fd = fd;
152 socks_.resize(max_fd+1, nullptr);
153 }
154 socks_[fd] = sock;
155 continue;
156 }
157
158 // Client: Grab the socket associated with this fd and handle the request
159 // Note that this is a read, which can't interfere with the state safe
160 // interrupt handler and thus doesn't need to be guarded.
161 auto* sock = socks_[i];
162 do {
163 Rpc rpc;
164 rpc.deserialize(*sock);
165 switch (rpc.type_) {
166
167 // Compiler ABI: Note that these methods remove elements from the
168 // sock index which aren't used anywhere else and thus don't need to
169 // be guarded.
170 case Rpc::Type::COMPILE: {
171 compile(sock, rpc);
172 sock = nullptr;
173 socks_[i] = nullptr;
174 FD_CLR(i, &master_set);
175 break;
176 }
177 case Rpc::Type::STOP_COMPILE: {
178 stop_compile(sock, rpc);
179 sock = nullptr;
180 socks_[i] = nullptr;
181 FD_CLR(i, &master_set);
182 break;
183 }
184
185 // Core ABI:
186 case Rpc::Type::GET_STATE:
187 get_state(sock, get_engine(rpc));
188 break;
189 case Rpc::Type::SET_STATE:
190 set_state(sock, get_engine(rpc));
191 break;
192 case Rpc::Type::GET_INPUT:
193 get_input(sock, get_engine(rpc));
194 break;
195 case Rpc::Type::SET_INPUT:
196 set_input(sock, get_engine(rpc));
197 break;
198 case Rpc::Type::FINALIZE:
199 finalize(sock, get_engine(rpc));
200 break;
201 case Rpc::Type::OVERRIDES_DONE_STEP:
202 overrides_done_step(sock, get_engine(rpc));
203 break;
204 case Rpc::Type::DONE_STEP:
205 done_step(sock, get_engine(rpc));
206 break;
207 case Rpc::Type::OVERRIDES_DONE_SIMULATION:
208 overrides_done_simulation(sock, get_engine(rpc));
209 break;
210 case Rpc::Type::DONE_SIMULATION:
211 done_simulation(sock, get_engine(rpc));
212 break;
213 case Rpc::Type::READ:
214 read(sock, get_engine(rpc));
215 break;
216 case Rpc::Type::EVALUATE:
217 evaluate(sock, get_engine(rpc));
218 break;
219 case Rpc::Type::THERE_ARE_UPDATES:
220 there_are_updates(sock, get_engine(rpc));
221 break;
222 case Rpc::Type::UPDATE:
223 update(sock, get_engine(rpc));
224 break;
225 case Rpc::Type::THERE_WERE_TASKS:
226 there_were_tasks(sock, get_engine(rpc));
227 break;
228 case Rpc::Type::CONDITIONAL_UPDATE:
229 conditional_update(sock, get_engine(rpc));
230 break;
231 case Rpc::Type::OPEN_LOOP:
232 open_loop(sock, get_engine(rpc));
233 break;
234
235 // Proxy Compiler Codes:
236 case Rpc::Type::OPEN_CONN_1: {
237 lock_guard<mutex> lg(slock_);
238 open_conn_1(sock, rpc);
239 FD_CLR(sock->descriptor(), &master_set);
240 break;
241 }
242 case Rpc::Type::OPEN_CONN_2: {
243 lock_guard<mutex> lg(slock_);
244 open_conn_2(sock, rpc);
245 break;
246 }
247 case Rpc::Type::CLOSE_CONN: {
248 lock_guard<mutex> lg(slock_);
249 sock = nullptr;
250 delete socks_[sock_index_[rpc.pid_].first];
251 delete socks_[sock_index_[rpc.pid_].second];
252 socks_[sock_index_[rpc.pid_].first] = nullptr;
253 socks_[sock_index_[rpc.pid_].second] = nullptr;
254 FD_CLR(sock_index_[rpc.pid_].second, &master_set);
255 sock_index_[rpc.pid_] = make_pair(-1,-1);
256 break;
257 }
258
259 // Proxy Core Codes:
260 case Rpc::Type::TEARDOWN_ENGINE:
261 teardown_engine(sock, rpc);
262 break;
263
264 // Control reaches here innocuosly when fds are closed remotely
265 default:
266 break;
267 }
268 } while ((sock != nullptr) && (sock->rdbuf()->in_avail() > 0));
269 }
270 }
271
272 // Stop all asynchronous compilation threads.
273 Compiler::stop_compile();
274 pool_.stop_now();
275
276 // We have exclusive access to the indices. Delete their contents.
277 for (auto& es : engines_) {
278 for (auto* e : es) {
279 if (e != nullptr) {
280 delete e;
281 }
282 }
283 }
284 engines_.clear();
285 for (auto* s : socks_) {
286 if (s != nullptr) {
287 delete s;
288 }
289 }
290 socks_.clear();
291 }
292
compile(sockstream * sock,const Rpc & rpc)293 void RemoteCompiler::compile(sockstream* sock, const Rpc& rpc) {
294 // Read the module declaration in the request
295 Log log;
296 Parser p(&log);
297 p.parse(*sock);
298 assert(!log.error());
299 assert((*p.begin())->is(Node::Tag::module_declaration));
300 auto* md = static_cast<ModuleDeclaration*>(*p.begin());
301
302 // Add a new entry to the engine table if necessary
303 auto eid = 0;
304 { lock_guard<mutex> lg(elock_);
305 if (rpc.pid_ >= engine_index_.size()) {
306 engine_index_.resize(rpc.pid_+1);
307 }
308 if (rpc.eid_ >= engine_index_[rpc.pid_].size()) {
309 engine_index_[rpc.pid_].resize(rpc.eid_+1, -1);
310 }
311 engine_index_[rpc.pid_][rpc.eid_] = engines_.size();
312 eid = engine_index_[rpc.pid_][rpc.eid_];
313 engines_.resize(engines_.size()+1);
314 }
315
316 // Now create a new thread to compile the code, enter it into the
317 // engine table, and close the socket when it's done.
318 pool_.insert([this, sock, rpc, md, eid]{
319 // TODO(eschkufz) Race condition here between when we set sock_ and when
320 // it's read. Also, note the unguarded access to sock_index_
321 sock_ = socks_[sock_index_[rpc.pid_].second];
322 assert(sock_ != nullptr);
323 auto* e = Compiler::compile(eid, md);
324
325 if (e != nullptr) {
326 { lock_guard<mutex> lg(elock_);
327 engines_[eid].push_back(e);
328 Rpc(Rpc::Type::OKAY, rpc.pid_, rpc.eid_, engines_[eid].size()-1).serialize(*sock);
329 }
330 sock->flush();
331 } else {
332 Rpc(Rpc::Type::FAIL).serialize(*sock);
333 sock->flush();
334 }
335 delete sock;
336 });
337 }
338
stop_compile(sockstream * sock,const Rpc & rpc)339 void RemoteCompiler::stop_compile(sockstream* sock, const Rpc& rpc) {
340 auto eid = 0;
341 { lock_guard<mutex> lg(elock_);
342 if ((rpc.pid_ >= engine_index_.size()) || (rpc.eid_ >= engine_index_[rpc.pid_].size())) {
343 eid = -1;
344 } else {
345 eid = engine_index_[rpc.pid_][rpc.eid_];
346 }
347 }
348 if (eid != -1) {
349 Compiler::stop_compile(eid);
350 }
351 Rpc(Rpc::Type::OKAY).serialize(*sock);
352 sock->flush();
353 delete sock;
354 }
355
get_state(sockstream * sock,Engine * e)356 void RemoteCompiler::get_state(sockstream* sock, Engine* e) {
357 auto* s = e->get_state();
358 s->serialize(*sock);
359 delete s;
360 sock->flush();
361 }
362
set_state(sockstream * sock,Engine * e)363 void RemoteCompiler::set_state(sockstream* sock, Engine* e) {
364 auto* s = new State();
365 s->deserialize(*sock);
366 e->set_state(s);
367 delete s;
368 }
369
get_input(sockstream * sock,Engine * e)370 void RemoteCompiler::get_input(sockstream* sock, Engine* e) {
371 auto* i = e->get_input();
372 i->serialize(*sock);
373 delete i;
374 sock->flush();
375 }
376
set_input(sockstream * sock,Engine * e)377 void RemoteCompiler::set_input(sockstream* sock, Engine* e) {
378 auto* i = new Input();
379 i->deserialize(*sock);
380 e->set_input(i);
381 delete i;
382 }
383
finalize(sockstream * sock,Engine * e)384 void RemoteCompiler::finalize(sockstream* sock, Engine* e) {
385 e->finalize();
386 // This call to finalize will have primed the socket with tasks and writes
387 // Appending an OKAY rpc indicates that everything has been sent
388 Rpc(Rpc::Type::OKAY).serialize(*sock);
389 sock->flush();
390 }
391
overrides_done_step(sockstream * sock,Engine * e)392 void RemoteCompiler::overrides_done_step(sockstream* sock, Engine* e) {
393 sock->put(e->overrides_done_step() ? 1 : 0);
394 sock->flush();
395 }
396
done_step(sockstream * sock,Engine * e)397 void RemoteCompiler::done_step(sockstream* sock, Engine* e) {
398 (void) sock;
399 e->done_step();
400 }
401
overrides_done_simulation(sockstream * sock,Engine * e)402 void RemoteCompiler::overrides_done_simulation(sockstream* sock, Engine* e) {
403 sock->put(e->overrides_done_simulation() ? 1 : 0);
404 sock->flush();
405 }
406
done_simulation(sockstream * sock,Engine * e)407 void RemoteCompiler::done_simulation(sockstream* sock, Engine* e) {
408 (void) sock;
409 e->done_simulation();
410 }
411
read(sockstream * sock,Engine * e)412 void RemoteCompiler::read(sockstream* sock, Engine* e) {
413 VId id = 0;
414 sock->read(reinterpret_cast<char*>(&id), 4);
415 Bits bits;
416 bits.deserialize(*sock);
417 e->read(id, &bits);
418 }
419
evaluate(sockstream * sock,Engine * e)420 void RemoteCompiler::evaluate(sockstream* sock, Engine* e) {
421 e->evaluate();
422 // This call to evaluate will have primed the socket with tasks and writes
423 // Appending an OKAY rpc, indicates that everything has been sent.
424 Rpc(Rpc::Type::OKAY).serialize(*sock);
425 sock->flush();
426 }
427
there_are_updates(sockstream * sock,Engine * e)428 void RemoteCompiler::there_are_updates(sockstream* sock, Engine* e) {
429 sock->put(e->there_are_updates() ? 1 : 0);
430 }
431
update(sockstream * sock,Engine * e)432 void RemoteCompiler::update(sockstream* sock, Engine* e) {
433 e->update();
434 // This call to update will have primed the socket with tasks and writes
435 // Appending an OKAY rpc, indicates that everything has been sent.
436 Rpc(Rpc::Type::OKAY).serialize(*sock);
437 sock->flush();
438 }
439
there_were_tasks(sockstream * sock,Engine * e)440 void RemoteCompiler::there_were_tasks(sockstream* sock, Engine* e) {
441 sock->put(e->there_were_tasks() ? 1 : 0);
442 }
443
conditional_update(sockstream * sock,Engine * e)444 void RemoteCompiler::conditional_update(sockstream* sock, Engine* e) {
445 const auto res = e->conditional_update();
446 // This call to conditional_update will have primed the socket with tasks and
447 // writes Appending an OKAY rpc, indicates that everything has been sent.
448 Rpc(Rpc::Type::OKAY).serialize(*sock);
449 sock->put(res ? 1 : 0);
450 sock->flush();
451 }
452
open_loop(sockstream * sock,Engine * e)453 void RemoteCompiler::open_loop(sockstream* sock, Engine* e) {
454 uint32_t clk = 0;
455 sock->read(reinterpret_cast<char*>(&clk), 4);
456 bool val = (sock->get() == 1);
457 uint32_t itr = 0;
458 sock->read(reinterpret_cast<char*>(&itr), 4);
459
460 const uint32_t res = e->open_loop(clk, val, itr);
461 // This call to open_loop will have primed the socket with tasks and
462 // writes Appending an OKAY rpc, indicates that everything has been sent.
463 Rpc(Rpc::Type::OKAY).serialize(*sock);
464 sock->write(reinterpret_cast<const char*>(&res), 4);
465 sock->flush();
466 }
467
open_conn_1(sockstream * sock,const Rpc & rpc)468 void RemoteCompiler::open_conn_1(sockstream* sock, const Rpc& rpc) {
469 (void) rpc;
470 const auto pid = sock_index_.size();
471 sock_index_.push_back(make_pair(sock->descriptor(), 0));
472 Rpc(Rpc::Type::OKAY, pid, 0, 0).serialize(*sock);
473 sock->flush();
474 }
475
open_conn_2(sockstream * sock,const Rpc & rpc)476 void RemoteCompiler::open_conn_2(sockstream* sock, const Rpc& rpc) {
477 sock_index_[rpc.pid_].second = sock->descriptor();
478 Rpc(Rpc::Type::OKAY).serialize(*sock);
479 sock->flush();
480 }
481
teardown_engine(sockstream * sock,const Rpc & rpc)482 void RemoteCompiler::teardown_engine(sockstream* sock, const Rpc& rpc) {
483 { lock_guard<mutex> lg(elock_);
484 delete engines_[engine_index_[rpc.pid_][rpc.eid_]][rpc.n_];
485 engines_[engine_index_[rpc.pid_][rpc.eid_]][rpc.n_] = nullptr;
486 Rpc(Rpc::Type::OKAY).serialize(*sock);
487 sock->flush();
488 }
489 }
490
get_engine(const Rpc & rpc)491 Engine* RemoteCompiler::get_engine(const Rpc& rpc) {
492 lock_guard<mutex> lg(elock_);
493 return engines_[engine_index_[rpc.pid_][rpc.eid_]][rpc.n_];
494 }
495
496 } // namespace cascade
497