1 // Copyright 2017-2019 VMware, Inc.
2 // SPDX-License-Identifier: BSD-2-Clause
3 //
4 // The BSD-2 license (the License) set forth below applies to all parts of the
5 // Cascade project.  You may not use this file except in compliance with the
6 // License.
7 //
8 // BSD-2 License
9 //
10 // Redistribution and use in source and binary forms, with or without
11 // modification, are permitted provided that the following conditions are met:
12 //
13 // 1. Redistributions of source code must retain the above copyright notice, this
14 // list of conditions and the following disclaimer.
15 //
16 // 2. Redistributions in binary form must reproduce the above copyright notice,
17 // this list of conditions and the following disclaimer in the documentation
18 // and/or other materials provided with the distribution.
19 //
20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS AS IS AND
21 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24 // FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25 // DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26 // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27 // CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 // OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 
31 #include "target/compiler/remote_compiler.h"
32 
33 #include <cassert>
34 #include <unordered_map>
35 #include "common/log.h"
36 #include "common/sockserver.h"
37 #include "common/sockstream.h"
38 #include "target/compiler/remote_interface.h"
39 #include "target/engine.h"
40 #include "target/state.h"
41 #include "verilog/ast/ast.h"
42 #include "verilog/parse/parser.h"
43 
44 using namespace std;
45 
46 namespace cascade {
47 
RemoteCompiler()48 RemoteCompiler::RemoteCompiler() : Compiler(), Thread() {
49   set_path("/tmp/fpga_socket");
50   set_port(8800);
51 
52   sock_ = nullptr;
53 }
54 
~RemoteCompiler()55 RemoteCompiler::~RemoteCompiler() {
56   Compiler::stop_async();
57 }
58 
schedule_state_safe_interrupt(Runtime::Interrupt int_)59 void RemoteCompiler::schedule_state_safe_interrupt(Runtime::Interrupt int_) {
60   lock_guard<mutex> lg(slock_);
61 
62   // Send a state safe begin request to every registered compiler and wait
63   // for them to reply with a state safe okay
64   for (const auto& si : sock_index_) {
65     if (si.first == -1) {
66       continue;
67     }
68     auto* asock = socks_[si.first];
69     Rpc(Rpc::Type::STATE_SAFE_BEGIN).serialize(*asock);
70     asock->flush();
71     Rpc res;
72     res.deserialize(*asock);
73     assert(res.type_ == Rpc::Type::STATE_SAFE_OKAY);
74   }
75 
76   // We now have every known instance of cascade either blocked in a state safe
77   // window or reporting that it's executed its finish statement.
78   int_();
79 
80   // Send a state safe finish response to every known instance of cascade
81   for (const auto& si : sock_index_) {
82     if (si.first == -1) {
83       continue;
84     }
85     auto* asock = socks_[si.first];
86     Rpc(Rpc::Type::STATE_SAFE_FINISH).serialize(*asock);
87     asock->flush();
88   }
89 }
90 
get_interface(const std::string & loc)91 Interface* RemoteCompiler::get_interface(const std::string& loc) {
92   if (loc != "remote") {
93     return nullptr;
94   }
95   if (sock_ == nullptr) {
96     return nullptr;
97   }
98   return new RemoteInterface(sock_);
99 }
100 
set_path(const string & p)101 RemoteCompiler& RemoteCompiler::set_path(const string& p) {
102   path_ = p;
103   return *this;
104 }
105 
set_port(uint32_t p)106 RemoteCompiler& RemoteCompiler::set_port(uint32_t p) {
107   port_ = p;
108   return *this;
109 }
110 
run_logic()111 void RemoteCompiler::run_logic() {
112   sockserver tl(port_, 8);
113   sockserver ul(path_.c_str(), 8);
114   if (tl.error() || ul.error()) {
115     return;
116   }
117 
118   fd_set master_set;
119   FD_ZERO(&master_set);
120   FD_SET(tl.descriptor(), &master_set);
121   FD_SET(ul.descriptor(), &master_set);
122 
123   fd_set read_set;
124   FD_ZERO(&read_set);
125 
126   struct timeval timeout = {1, 0};
127   auto max_fd = max(tl.descriptor(), ul.descriptor());
128 
129   pool_.set_num_threads(4);
130   pool_.run();
131 
132   while (!stop_requested()) {
133     read_set = master_set;
134     select(max_fd+1, &read_set, nullptr, nullptr, &timeout);
135     for (auto i = 0; i <= max_fd; ++i) {
136 
137       // Not ready; nothing to do
138       if (!FD_ISSET(i, &read_set)) {
139         continue;
140       }
141 
142       // Listener logic: New connections are added to the read set Note that
143       // this is a write critical section for sockets so it is guarded against
144       // race conditions with the state safe interrupt handler.
145       if ((i == tl.descriptor()) || (i == ul.descriptor())) {
146         lock_guard<mutex> lg(slock_);
147         auto* sock = (i == tl.descriptor()) ? tl.accept() : ul.accept();
148         const auto fd = sock->descriptor();
149         FD_SET(fd, &master_set);
150         if (fd > max_fd) {
151           max_fd = fd;
152           socks_.resize(max_fd+1, nullptr);
153         }
154         socks_[fd] = sock;
155         continue;
156       }
157 
158       // Client: Grab the socket associated with this fd and handle the request
159       // Note that this is a read, which can't interfere with the state safe
160       // interrupt handler and thus doesn't need to be guarded.
161       auto* sock = socks_[i];
162       do {
163         Rpc rpc;
164         rpc.deserialize(*sock);
165         switch (rpc.type_) {
166 
167           // Compiler ABI: Note that these methods remove elements from the
168           // sock index which aren't used anywhere else and thus don't need to
169           // be guarded.
170           case Rpc::Type::COMPILE: {
171             compile(sock, rpc);
172             sock = nullptr;
173             socks_[i] = nullptr;
174             FD_CLR(i, &master_set);
175             break;
176           }
177           case Rpc::Type::STOP_COMPILE: {
178             stop_compile(sock, rpc);
179             sock = nullptr;
180             socks_[i] = nullptr;
181             FD_CLR(i, &master_set);
182             break;
183           }
184 
185           // Core ABI:
186           case Rpc::Type::GET_STATE:
187             get_state(sock, get_engine(rpc));
188             break;
189           case Rpc::Type::SET_STATE:
190             set_state(sock, get_engine(rpc));
191             break;
192           case Rpc::Type::GET_INPUT:
193             get_input(sock, get_engine(rpc));
194             break;
195           case Rpc::Type::SET_INPUT:
196             set_input(sock, get_engine(rpc));
197             break;
198           case Rpc::Type::FINALIZE:
199             finalize(sock, get_engine(rpc));
200             break;
201           case Rpc::Type::OVERRIDES_DONE_STEP:
202             overrides_done_step(sock, get_engine(rpc));
203             break;
204           case Rpc::Type::DONE_STEP:
205             done_step(sock, get_engine(rpc));
206             break;
207           case Rpc::Type::OVERRIDES_DONE_SIMULATION:
208             overrides_done_simulation(sock, get_engine(rpc));
209             break;
210           case Rpc::Type::DONE_SIMULATION:
211             done_simulation(sock, get_engine(rpc));
212             break;
213           case Rpc::Type::READ:
214             read(sock, get_engine(rpc));
215             break;
216           case Rpc::Type::EVALUATE:
217             evaluate(sock, get_engine(rpc));
218             break;
219           case Rpc::Type::THERE_ARE_UPDATES:
220             there_are_updates(sock, get_engine(rpc));
221             break;
222           case Rpc::Type::UPDATE:
223             update(sock, get_engine(rpc));
224             break;
225           case Rpc::Type::THERE_WERE_TASKS:
226             there_were_tasks(sock, get_engine(rpc));
227             break;
228           case Rpc::Type::CONDITIONAL_UPDATE:
229             conditional_update(sock, get_engine(rpc));
230             break;
231           case Rpc::Type::OPEN_LOOP:
232             open_loop(sock, get_engine(rpc));
233             break;
234 
235           // Proxy Compiler Codes:
236           case Rpc::Type::OPEN_CONN_1: {
237             lock_guard<mutex> lg(slock_);
238             open_conn_1(sock, rpc);
239             FD_CLR(sock->descriptor(), &master_set);
240             break;
241           }
242           case Rpc::Type::OPEN_CONN_2: {
243             lock_guard<mutex> lg(slock_);
244             open_conn_2(sock, rpc);
245             break;
246           }
247           case Rpc::Type::CLOSE_CONN: {
248             lock_guard<mutex> lg(slock_);
249             sock = nullptr;
250             delete socks_[sock_index_[rpc.pid_].first];
251             delete socks_[sock_index_[rpc.pid_].second];
252             socks_[sock_index_[rpc.pid_].first] = nullptr;
253             socks_[sock_index_[rpc.pid_].second] = nullptr;
254             FD_CLR(sock_index_[rpc.pid_].second, &master_set);
255             sock_index_[rpc.pid_] = make_pair(-1,-1);
256             break;
257           }
258 
259           // Proxy Core Codes:
260           case Rpc::Type::TEARDOWN_ENGINE:
261             teardown_engine(sock, rpc);
262             break;
263 
264           // Control reaches here innocuosly when fds are closed remotely
265           default:
266             break;
267         }
268       } while ((sock != nullptr) && (sock->rdbuf()->in_avail() > 0));
269     }
270   }
271 
272   // Stop all asynchronous compilation threads.
273   Compiler::stop_compile();
274   pool_.stop_now();
275 
276   // We have exclusive access to the indices. Delete their contents.
277   for (auto& es : engines_) {
278     for (auto* e : es) {
279       if (e != nullptr) {
280         delete e;
281       }
282     }
283   }
284   engines_.clear();
285   for (auto* s : socks_) {
286     if (s != nullptr) {
287       delete s;
288     }
289   }
290   socks_.clear();
291 }
292 
compile(sockstream * sock,const Rpc & rpc)293 void RemoteCompiler::compile(sockstream* sock, const Rpc& rpc) {
294   // Read the module declaration in the request
295   Log log;
296   Parser p(&log);
297   p.parse(*sock);
298   assert(!log.error());
299   assert((*p.begin())->is(Node::Tag::module_declaration));
300   auto* md = static_cast<ModuleDeclaration*>(*p.begin());
301 
302   // Add a new entry to the engine table if necessary
303   auto eid = 0;
304   { lock_guard<mutex> lg(elock_);
305     if (rpc.pid_ >= engine_index_.size()) {
306       engine_index_.resize(rpc.pid_+1);
307     }
308     if (rpc.eid_ >= engine_index_[rpc.pid_].size()) {
309       engine_index_[rpc.pid_].resize(rpc.eid_+1, -1);
310     }
311     engine_index_[rpc.pid_][rpc.eid_] = engines_.size();
312     eid = engine_index_[rpc.pid_][rpc.eid_];
313     engines_.resize(engines_.size()+1);
314   }
315 
316   // Now create a new thread to compile the code, enter it into the
317   // engine table, and close the socket when it's done.
318   pool_.insert([this, sock, rpc, md, eid]{
319     // TODO(eschkufz) Race condition here between when we set sock_ and when
320     // it's read.  Also, note the unguarded access to sock_index_
321     sock_ = socks_[sock_index_[rpc.pid_].second];
322     assert(sock_ != nullptr);
323     auto* e = Compiler::compile(eid, md);
324 
325     if (e != nullptr) {
326       { lock_guard<mutex> lg(elock_);
327         engines_[eid].push_back(e);
328         Rpc(Rpc::Type::OKAY, rpc.pid_, rpc.eid_, engines_[eid].size()-1).serialize(*sock);
329       }
330       sock->flush();
331     } else {
332       Rpc(Rpc::Type::FAIL).serialize(*sock);
333       sock->flush();
334     }
335     delete sock;
336   });
337 }
338 
stop_compile(sockstream * sock,const Rpc & rpc)339 void RemoteCompiler::stop_compile(sockstream* sock, const Rpc& rpc) {
340   auto eid = 0;
341   { lock_guard<mutex> lg(elock_);
342     if ((rpc.pid_ >= engine_index_.size()) || (rpc.eid_ >= engine_index_[rpc.pid_].size())) {
343       eid = -1;
344     } else {
345       eid = engine_index_[rpc.pid_][rpc.eid_];
346     }
347   }
348   if (eid != -1) {
349     Compiler::stop_compile(eid);
350   }
351   Rpc(Rpc::Type::OKAY).serialize(*sock);
352   sock->flush();
353   delete sock;
354 }
355 
get_state(sockstream * sock,Engine * e)356 void RemoteCompiler::get_state(sockstream* sock, Engine* e) {
357   auto* s = e->get_state();
358   s->serialize(*sock);
359   delete s;
360   sock->flush();
361 }
362 
set_state(sockstream * sock,Engine * e)363 void RemoteCompiler::set_state(sockstream* sock, Engine* e) {
364   auto* s = new State();
365   s->deserialize(*sock);
366   e->set_state(s);
367   delete s;
368 }
369 
get_input(sockstream * sock,Engine * e)370 void RemoteCompiler::get_input(sockstream* sock, Engine* e) {
371   auto* i = e->get_input();
372   i->serialize(*sock);
373   delete i;
374   sock->flush();
375 }
376 
set_input(sockstream * sock,Engine * e)377 void RemoteCompiler::set_input(sockstream* sock, Engine* e) {
378   auto* i = new Input();
379   i->deserialize(*sock);
380   e->set_input(i);
381   delete i;
382 }
383 
finalize(sockstream * sock,Engine * e)384 void RemoteCompiler::finalize(sockstream* sock, Engine* e) {
385   e->finalize();
386   // This call to finalize will have primed the socket with tasks and writes
387   // Appending an OKAY rpc indicates that everything has been sent
388   Rpc(Rpc::Type::OKAY).serialize(*sock);
389   sock->flush();
390 }
391 
overrides_done_step(sockstream * sock,Engine * e)392 void RemoteCompiler::overrides_done_step(sockstream* sock, Engine* e) {
393   sock->put(e->overrides_done_step() ? 1 : 0);
394   sock->flush();
395 }
396 
done_step(sockstream * sock,Engine * e)397 void RemoteCompiler::done_step(sockstream* sock, Engine* e) {
398   (void) sock;
399   e->done_step();
400 }
401 
overrides_done_simulation(sockstream * sock,Engine * e)402 void RemoteCompiler::overrides_done_simulation(sockstream* sock, Engine* e) {
403   sock->put(e->overrides_done_simulation() ? 1 : 0);
404   sock->flush();
405 }
406 
done_simulation(sockstream * sock,Engine * e)407 void RemoteCompiler::done_simulation(sockstream* sock, Engine* e) {
408   (void) sock;
409   e->done_simulation();
410 }
411 
read(sockstream * sock,Engine * e)412 void RemoteCompiler::read(sockstream* sock, Engine* e) {
413   VId id = 0;
414   sock->read(reinterpret_cast<char*>(&id), 4);
415   Bits bits;
416   bits.deserialize(*sock);
417   e->read(id, &bits);
418 }
419 
evaluate(sockstream * sock,Engine * e)420 void RemoteCompiler::evaluate(sockstream* sock, Engine* e) {
421   e->evaluate();
422   // This call to evaluate will have primed the socket with tasks and writes
423   // Appending an OKAY rpc, indicates that everything has been sent.
424   Rpc(Rpc::Type::OKAY).serialize(*sock);
425   sock->flush();
426 }
427 
there_are_updates(sockstream * sock,Engine * e)428 void RemoteCompiler::there_are_updates(sockstream* sock, Engine* e) {
429   sock->put(e->there_are_updates() ? 1 : 0);
430 }
431 
update(sockstream * sock,Engine * e)432 void RemoteCompiler::update(sockstream* sock, Engine* e) {
433   e->update();
434   // This call to update will have primed the socket with tasks and writes
435   // Appending an OKAY rpc, indicates that everything has been sent.
436   Rpc(Rpc::Type::OKAY).serialize(*sock);
437   sock->flush();
438 }
439 
there_were_tasks(sockstream * sock,Engine * e)440 void RemoteCompiler::there_were_tasks(sockstream* sock, Engine* e) {
441   sock->put(e->there_were_tasks() ? 1 : 0);
442 }
443 
conditional_update(sockstream * sock,Engine * e)444 void RemoteCompiler::conditional_update(sockstream* sock, Engine* e) {
445   const auto res = e->conditional_update();
446   // This call to conditional_update will have primed the socket with tasks and
447   // writes Appending an OKAY rpc, indicates that everything has been sent.
448   Rpc(Rpc::Type::OKAY).serialize(*sock);
449   sock->put(res ? 1 : 0);
450   sock->flush();
451 }
452 
open_loop(sockstream * sock,Engine * e)453 void RemoteCompiler::open_loop(sockstream* sock, Engine* e) {
454   uint32_t clk = 0;
455   sock->read(reinterpret_cast<char*>(&clk), 4);
456   bool val = (sock->get() == 1);
457   uint32_t itr = 0;
458   sock->read(reinterpret_cast<char*>(&itr), 4);
459 
460   const uint32_t res = e->open_loop(clk, val, itr);
461   // This call to open_loop  will have primed the socket with tasks and
462   // writes Appending an OKAY rpc, indicates that everything has been sent.
463   Rpc(Rpc::Type::OKAY).serialize(*sock);
464   sock->write(reinterpret_cast<const char*>(&res), 4);
465   sock->flush();
466 }
467 
open_conn_1(sockstream * sock,const Rpc & rpc)468 void RemoteCompiler::open_conn_1(sockstream* sock, const Rpc& rpc) {
469   (void) rpc;
470   const auto pid = sock_index_.size();
471   sock_index_.push_back(make_pair(sock->descriptor(), 0));
472   Rpc(Rpc::Type::OKAY, pid, 0, 0).serialize(*sock);
473   sock->flush();
474 }
475 
open_conn_2(sockstream * sock,const Rpc & rpc)476 void RemoteCompiler::open_conn_2(sockstream* sock, const Rpc& rpc) {
477   sock_index_[rpc.pid_].second = sock->descriptor();
478   Rpc(Rpc::Type::OKAY).serialize(*sock);
479   sock->flush();
480 }
481 
teardown_engine(sockstream * sock,const Rpc & rpc)482 void RemoteCompiler::teardown_engine(sockstream* sock, const Rpc& rpc) {
483   { lock_guard<mutex> lg(elock_);
484     delete engines_[engine_index_[rpc.pid_][rpc.eid_]][rpc.n_];
485     engines_[engine_index_[rpc.pid_][rpc.eid_]][rpc.n_] = nullptr;
486     Rpc(Rpc::Type::OKAY).serialize(*sock);
487     sock->flush();
488   }
489 }
490 
get_engine(const Rpc & rpc)491 Engine* RemoteCompiler::get_engine(const Rpc& rpc) {
492   lock_guard<mutex> lg(elock_);
493   return engines_[engine_index_[rpc.pid_][rpc.eid_]][rpc.n_];
494 }
495 
496 } // namespace cascade
497