1 /* 2 3 Copyright (C) 2015-2019 Olaf Till <i7tiol@t-online.de> 4 5 This program is free software; you can redistribute it and/or modify 6 it under the terms of the GNU General Public License as published by 7 the Free Software Foundation; either version 3 of the License, or 8 (at your option) any later version. 9 10 This program is distributed in the hope that it will be useful, 11 but WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 GNU General Public License for more details. 14 15 You should have received a copy of the GNU General Public License 16 along with this program; If not, see <http://www.gnu.org/licenses/>. 17 18 */ 19 20 #ifndef __OCT_PARALLEL_CONNECTION__ 21 22 #define __OCT_PARALLEL_CONNECTION__ 23 24 #include <octave/oct.h> 25 #include <octave/oct-refcount.h> 26 27 #include <fstream> 28 #include <string.h> 29 30 #include "gnulib-wrappers.h" 31 32 #include "minimal-load-save.h" 33 34 // at 'p' 37 bytes must be available; returns 0 on success and -1 on failure 35 int oct_parallel_store_unique_identifier (char *p); 36 37 class 38 octave_parallel_connection 39 { 40 public: 41 octave_parallel_connection(const char * arg_peer_name,bool i_am_server,const char * auuid)42 octave_parallel_connection (const char *arg_peer_name, bool i_am_server, 43 const char *auuid) 44 : peer_node_n (-1), cmd_stream (NULL), data_stream (NULL), 45 connection_open (false), pseudo_connection (false), 46 peer_name (arg_peer_name), nproc (0), nlocaljobs (0), 47 server (i_am_server), uuid (auuid) 48 { 49 50 } 51 octave_parallel_connection(bool i_am_server,const char * auuid)52 octave_parallel_connection (bool i_am_server, const char *auuid) 53 : peer_node_n (-1), cmd_stream (NULL), data_stream (NULL), 54 connection_open (false), pseudo_connection (true), 55 peer_name (""), nproc (0), nlocaljobs (0), 56 server (i_am_server), uuid (auuid) 57 58 59 { 60 61 } 62 insert_cmd_stream(octave_parallel_stream * s)63 octave_parallel_connection *insert_cmd_stream (octave_parallel_stream *s) 64 { 65 cmd_stream = s; 66 67 connection_open = true; 68 69 return this; 70 } 71 insert_data_stream(octave_parallel_stream * s)72 octave_parallel_connection *insert_data_stream (octave_parallel_stream *s) 73 { 74 data_stream = s; 75 76 connection_open = true; 77 78 return this; 79 } 80 set_nproc(uint32_t anproc)81 void set_nproc (uint32_t anproc) 82 { 83 nproc = anproc; 84 } 85 set_nlocaljobs(uint32_t anlocaljobs)86 void set_nlocaljobs (uint32_t anlocaljobs) 87 { 88 nlocaljobs = anlocaljobs; 89 } 90 get_nproc(void)91 uint32_t get_nproc (void) 92 { 93 return nproc; 94 } 95 get_nlocaljobs(void)96 uint32_t get_nlocaljobs (void) 97 { 98 return nlocaljobs; 99 } 100 get_cmd_stream(void)101 octave_parallel_stream *get_cmd_stream (void) const 102 { 103 return cmd_stream; 104 } 105 get_data_stream(void)106 octave_parallel_stream *get_data_stream (void) const 107 { 108 return data_stream; 109 } 110 poll_for_errors(void)111 int poll_for_errors (void) 112 { 113 if (! cmd_stream) 114 { 115 _p_error ("can't poll since no command stream present"); 116 117 return -1; 118 } 119 120 if (! cmd_stream->get_strbuf ()-> good ()) 121 { 122 _p_error ("can't poll since command stream not good"); 123 124 return -1; 125 } 126 127 int ret = 0; 128 129 uint32_t err; 130 while (cmd_stream->get_strbuf ()->in_avail () > 0) 131 { 132 err = cmd_stream->network_recv_4byteint (err); 133 134 if (! cmd_stream->good ()) 135 { 136 _p_error ("error in reading error code"); 137 138 return -1; 139 } 140 141 _p_error ("An error with code %u occurred in server %s.\nSee %s:/tmp/octave_error-<remote-host-name>_%s.log for details.", 142 err, peer_name.c_str (), peer_name.c_str (), uuid.c_str ()); 143 144 ret = 1; 145 } 146 147 return ret; 148 } 149 150 int wait_for_errors_or_data (void); 151 get_peer_name(void)152 const std::string &get_peer_name (void) 153 { 154 return peer_name; 155 } 156 ~octave_parallel_connection(void)157 ~octave_parallel_connection (void) 158 { 159 dlprintf ("pconnection destructor called\n"); 160 if (connection_open) 161 close (); 162 163 if (cmd_stream) 164 { 165 dlprintf ("pconnection destructor will delete cmd_stream\n"); 166 delete cmd_stream; 167 } 168 169 if (data_stream) 170 { 171 dlprintf ("pconnection destructor will delete data_stream\n"); 172 delete data_stream; 173 } 174 } 175 delete_data_stream(void)176 int delete_data_stream (void) 177 { 178 if (data_stream) 179 { 180 data_stream->close (); 181 182 if (data_stream->io_error ()) 183 { 184 _p_error ("could not close data stream"); 185 186 return -1; 187 } 188 189 delete data_stream; 190 191 data_stream = NULL; 192 } 193 194 return 0; 195 } 196 close(void)197 int close (void) 198 { 199 int ret = 0; 200 201 if (connection_open) 202 { 203 if (cmd_stream) 204 { 205 if (! server) 206 { 207 poll_for_errors (); 208 209 cmd_stream->network_send_string 210 ("sclose (sockets); __exit__;\n"); 211 } 212 213 cmd_stream->close (); 214 215 if (cmd_stream->io_error ()) 216 { 217 _p_error ("could not close command stream"); 218 219 ret = -1; 220 } 221 } 222 223 if (data_stream) 224 { 225 data_stream->close (); 226 227 if (data_stream->io_error ()) 228 { 229 _p_error ("could not close data stream"); 230 231 ret = -1; 232 } 233 } 234 235 connection_open = false; 236 237 return ret; 238 } 239 else if (! pseudo_connection) 240 { 241 _p_error ("connection is not open"); 242 243 return -1; 244 } 245 else 246 return ret; 247 } 248 is_pseudo_connection(void)249 bool is_pseudo_connection (void) 250 { 251 return pseudo_connection; 252 } 253 is_server(void)254 bool is_server (void) 255 { 256 return server; 257 } 258 connection_read_header(void)259 int connection_read_header (void) 260 { 261 return minimal_read_header (data_stream->get_istream (), 262 swap, flt_fmt); 263 } 264 connection_read_data(octave_value & tc)265 int connection_read_data (octave_value &tc) 266 { 267 return minimal_read_data (data_stream->get_istream (), tc, 268 swap, flt_fmt); 269 } 270 is_open(void)271 bool is_open (void) 272 { 273 return connection_open; 274 } 275 get_uuid(void)276 const std::string &get_uuid (void) 277 { 278 return uuid; 279 } 280 281 int32_t peer_node_n; // -1 is unset, 0 is client 282 283 private: 284 285 octave_parallel_stream *cmd_stream; 286 287 octave_parallel_stream *data_stream; 288 289 bool connection_open; 290 291 bool pseudo_connection; // for indicating own node in network 292 293 std::string peer_name; 294 295 // number of usable processor cores in peer, or in own machine if 296 // this is a pseudoconnection; num_processors() returns an unsigned 297 // long int, but there is no specific transefer function implemented 298 // here for long int, and such large numbers probably don't occur 299 // for nproc; supposed to be available (i.e. non-zero) for all 300 // connections in the clients network variable; not necessary in the 301 // servers network variables 302 uint32_t nproc; 303 304 // configurable, this information can be used by the client to 305 // decide how many local processes should be spawned at the server; 306 // not necessary in the servers network variables 307 uint32_t nlocaljobs; 308 309 bool server; 310 311 std::string uuid; 312 313 // for Octaves save and load functions 314 bool global; 315 bool swap; 316 OCTAVE__MACH_INFO::float_format flt_fmt; 317 std::string doc; 318 }; 319 320 class 321 octave_parallel_network 322 { 323 public: 324 325 typedef Array<octave_parallel_connection *> connarray; 326 octave_parallel_network(void)327 octave_parallel_network (void) : refcount (0), closed (false) 328 { 329 330 } 331 octave_parallel_network(int n)332 octave_parallel_network (int n) 333 : connections (dim_vector (n, 1), NULL), refcount (0), closed (false) 334 { 335 336 } 337 resize(int n)338 void resize (int n) 339 { 340 connections.resize1 (n, NULL); 341 } 342 add_connection(octave_parallel_connection * connection)343 void add_connection (octave_parallel_connection *connection) 344 { 345 connections.resize1 (connections.numel () + 1, connection); 346 } 347 insert_connection(octave_parallel_connection * connection,int idx)348 void insert_connection (octave_parallel_connection *connection, int idx) 349 { 350 connections(idx) = connection; 351 352 connection->peer_node_n = idx; 353 } 354 is_connection(int idx)355 bool is_connection (int idx) 356 { 357 return (bool) connections(idx); 358 } 359 close(void)360 int close (void) 361 { 362 if (closed) 363 { 364 _p_error ("network already closed"); 365 366 return -1; 367 } 368 else 369 { 370 int ret = 0; 371 372 for (int i = 0; i < connections.numel (); i++) 373 if (connections(i) && connections(i)->close ()) 374 ret = -1; 375 376 if (ret) 377 _p_error ("could not close network"); 378 else 379 closed = true; 380 381 return ret; 382 } 383 } 384 ~octave_parallel_network(void)385 ~octave_parallel_network (void) 386 { 387 dlprintf ("pnetwork destructor called ...\n"); 388 for (int i = 0; i < connections.numel (); i++) 389 { 390 dlprintf ("... checks that connections(%i) is not NULL ...\n", i); 391 if (connections(i)) 392 { 393 dlprintf ("... and deletes it.\n"); 394 delete connections(i); 395 } 396 } 397 } 398 get_ref(void)399 const connarray &get_ref (void) 400 { 401 refcount++; 402 403 return connections; 404 } 405 release_ref(void)406 int release_ref (void) 407 { 408 return --refcount; 409 } 410 411 is_closed(void)412 bool is_closed (void) 413 { 414 return closed; 415 } 416 417 private: 418 419 connarray connections; 420 421 OCTAVE__REFCOUNT<int> refcount; 422 423 bool closed; 424 }; 425 426 // This additional class is declared to circumvent const-ness of 427 // octave_value::get_rep() without a <const_cast>. It contains the 428 // members specific for the package, while the containing class 429 // octave_parallel_connections contains the stuff related to 430 // octave_base_value. 431 class 432 octave_parallel_connections_rep 433 { 434 friend class octave_parallel_connections; 435 436 public: 437 octave_parallel_connections_rep(octave_parallel_network * net,const char * auuid,bool i_am_server)438 octave_parallel_connections_rep (octave_parallel_network *net, 439 const char *auuid, bool i_am_server) 440 : network (net), connections (net->get_ref ()), uuid (auuid), 441 server (i_am_server), indexed (false) 442 { 443 subnet = connections; 444 } 445 446 // for indexing octave_parallel_connections_rep(octave_parallel_network * net,const char * auuid,bool i_am_server,octave_parallel_network::connarray & asubnet)447 octave_parallel_connections_rep (octave_parallel_network *net, 448 const char *auuid, bool i_am_server, 449 octave_parallel_network::connarray &asubnet) 450 : network (net), connections (net->get_ref ()), uuid (auuid), 451 server (i_am_server), indexed (true) 452 { 453 subnet = asubnet; 454 } 455 ~octave_parallel_connections_rep(void)456 ~octave_parallel_connections_rep (void) 457 { 458 dlprintf ("pconnections_rep destructor called, will request decr of network refcount and will check if it's <= 0 ...\n"); 459 int tpdebug; 460 if ((tpdebug = network->release_ref ()) <= 0) 461 { 462 dlprintf ("... since it is %i, will delete network\n", tpdebug); 463 delete network; 464 } 465 } 466 close(void)467 int close (void) 468 { 469 return network->close (); 470 } 471 472 // function to return the _whole_ net (even if this is a subnet), to 473 // make it possible to construct a new octave_value with this get_whole_network(void)474 octave_parallel_network *get_whole_network (void) 475 { 476 return network; 477 } 478 479 // oct-functions work with this get_connections(void)480 const octave_parallel_network::connarray &get_connections (void) 481 { 482 return subnet; 483 } 484 is_server(void)485 bool is_server (void) 486 { 487 return server; 488 } 489 get_all_connections(void)490 const octave_parallel_network::connarray &get_all_connections (void) 491 { 492 return connections; 493 } 494 495 private: 496 497 octave_parallel_network *network; 498 499 const octave_parallel_network::connarray &connections; 500 501 octave_parallel_network::connarray subnet; 502 503 std::string uuid; 504 505 bool server; 506 507 bool indexed; 508 }; 509 510 class 511 octave_parallel_connections : public octave_base_value 512 { 513 public: 514 octave_parallel_register_type(void)515 void octave_parallel_register_type (void) 516 { 517 static bool type_registered = false; 518 519 if (! type_registered) 520 { 521 register_type (); 522 523 type_registered = true; 524 } 525 } 526 octave_parallel_connections(octave_parallel_network * net,const char * auuid,bool i_am_server)527 octave_parallel_connections (octave_parallel_network *net, 528 const char *auuid, bool i_am_server) 529 : rep (new octave_parallel_connections_rep (net, auuid, i_am_server)) 530 { 531 octave_parallel_register_type (); 532 } 533 534 // for indexing octave_parallel_connections(octave_parallel_network * net,const char * auuid,bool i_am_server,octave_parallel_network::connarray & asubnet)535 octave_parallel_connections (octave_parallel_network *net, 536 const char *auuid, bool i_am_server, 537 octave_parallel_network::connarray &asubnet) 538 : rep (new octave_parallel_connections_rep (net, auuid, i_am_server, 539 asubnet)) 540 { 541 octave_parallel_register_type (); 542 } 543 ~octave_parallel_connections(void)544 ~octave_parallel_connections (void) 545 { 546 dlprintf ("pconnections destructor called, deletes pconnections_rep\n"); 547 delete rep; 548 } 549 550 // Octave internal stuff 551 552 octave_value do_index_op (const octave_value_list& idx, 553 bool resize_ok = false); 554 do_multi_index_op(int,const octave_value_list & idx)555 octave_value_list do_multi_index_op (int, const octave_value_list& idx) 556 { 557 return do_index_op (idx); 558 } 559 560 // copied from ov-base-mat subsref(const std::string & type,const std::list<octave_value_list> & idx)561 octave_value subsref (const std::string& type, 562 const std::list<octave_value_list>& idx) 563 { 564 octave_value retval; 565 566 switch (type[0]) 567 { 568 case '(': 569 retval = do_index_op (idx.front ()); 570 break; 571 572 case '{': 573 case '.': 574 { 575 std::string nm = type_name (); 576 error ("%s cannot be indexed with %c", nm.c_str (), type[0]); 577 } 578 break; 579 580 default: 581 panic_impossible (); 582 } 583 584 return retval.next_subsref (type, idx); 585 } 586 subsref(const std::string & type,const std::list<octave_value_list> & idx,int)587 octave_value_list subsref (const std::string& type, 588 const std::list<octave_value_list>& idx, int) 589 { 590 return subsref (type, idx); 591 } 592 is_constant(void)593 bool is_constant (void) const 594 { 595 return true; 596 } 597 is_defined(void)598 bool is_defined (void) const 599 { 600 return true; 601 } 602 is_true(void)603 bool is_true (void) const 604 { 605 return ! rep->network->is_closed (); 606 } 607 dims(void)608 dim_vector dims (void) const 609 { 610 return rep->subnet.dims (); 611 } 612 613 void print_raw (std::ostream& os, bool pr_as_read_syntax = false) const 614 { 615 octave_idx_type n = rep->connections.numel (); 616 octave_idx_type sn = rep->subnet.numel (); 617 618 indent (os); 619 620 os << "<parallel cluster connections object> "; 621 if (sn < n) 622 os << "subnet with " << sn << " of "; 623 os << n << " nodes"; 624 newline (os); 625 626 os << "network id: " << rep->uuid.c_str (); 627 newline (os); 628 629 if (rep->network->is_closed ()) 630 os << "----- closed -----"; 631 else 632 os << "----- open -----"; 633 newline (os); 634 635 for (octave_idx_type i = 0; i < sn; i++) 636 { 637 os << rep->subnet(i)->peer_node_n << ") "; 638 639 // if (rep->subnet(i)->is_server ()) 640 // os << "[server] "; 641 // else 642 // os << "[client] "; 643 644 if (rep->subnet(i)->is_pseudo_connection ()) 645 os << "<local machine>"; 646 else 647 os << rep->subnet(i)->get_peer_name ().c_str (); 648 649 newline (os); 650 } 651 } 652 653 void print (std::ostream& os, bool pr_as_read_syntax = false) const 654 { 655 print_raw (os); 656 } 657 // Octave changeset bcd71a2531d3 (Jan 31st 2014) made 658 // octave_base_value::print() non-const, after that this virtual 659 // function is not re-defined by the above print() function. Having 660 // both const and non-const print() here seems to work both with 661 // Octave < and >= bcd71a2531d3 (print() is only called over the 662 // parent class virtual function). 663 void print (std::ostream& os, bool pr_as_read_syntax = false) 664 { 665 print_raw (os); 666 } 667 print_as_scalar(void)668 bool print_as_scalar (void) const { return true; } 669 get_rep(void)670 octave_parallel_connections_rep *get_rep (void) const 671 { 672 return rep; 673 } 674 675 private: 676 677 // needed by Octave for register_type(), also used here in indexing octave_parallel_connections(void)678 octave_parallel_connections (void) 679 : rep (new octave_parallel_connections_rep 680 (new octave_parallel_network (), "", false)) 681 { 682 dlprintf ("pconnections default ctor called\n"); 683 } 684 685 octave_parallel_connections_rep *rep; 686 687 DECLARE_OV_TYPEID_FUNCTIONS_AND_DATA 688 }; 689 690 #endif // __OCT_PARALLEL_CONNECTION__ 691 692 693 /* 694 ;;; Local Variables: *** 695 ;;; mode: C++ *** 696 ;;; End: *** 697 */ 698