1 /*
2 
3 Copyright (C) 2015-2019 Olaf Till <i7tiol@t-online.de>
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 3 of the License, or
8 (at your option) any later version.
9 
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License
16 along with this program; If not, see <http://www.gnu.org/licenses/>.
17 
18 */
19 
20 #ifndef __OCT_PARALLEL_CONNECTION__
21 
22 #define __OCT_PARALLEL_CONNECTION__
23 
24 #include <octave/oct.h>
25 #include <octave/oct-refcount.h>
26 
27 #include <fstream>
28 #include <string.h>
29 
30 #include "gnulib-wrappers.h"
31 
32 #include "minimal-load-save.h"
33 
34 // at 'p' 37 bytes must be available; returns 0 on success and -1 on failure
35 int oct_parallel_store_unique_identifier (char *p);
36 
37 class
38 octave_parallel_connection
39 {
40 public:
41 
octave_parallel_connection(const char * arg_peer_name,bool i_am_server,const char * auuid)42   octave_parallel_connection (const char *arg_peer_name, bool i_am_server,
43                               const char *auuid)
44     : peer_node_n (-1), cmd_stream (NULL), data_stream (NULL),
45       connection_open (false), pseudo_connection (false),
46       peer_name (arg_peer_name), nproc (0), nlocaljobs (0),
47       server (i_am_server), uuid (auuid)
48   {
49 
50   }
51 
octave_parallel_connection(bool i_am_server,const char * auuid)52   octave_parallel_connection (bool i_am_server, const char *auuid)
53     : peer_node_n (-1), cmd_stream (NULL), data_stream (NULL),
54       connection_open (false), pseudo_connection (true),
55       peer_name (""), nproc (0), nlocaljobs (0),
56       server (i_am_server), uuid (auuid)
57 
58 
59   {
60 
61   }
62 
insert_cmd_stream(octave_parallel_stream * s)63   octave_parallel_connection *insert_cmd_stream (octave_parallel_stream *s)
64   {
65     cmd_stream = s;
66 
67     connection_open = true;
68 
69     return this;
70   }
71 
insert_data_stream(octave_parallel_stream * s)72   octave_parallel_connection *insert_data_stream (octave_parallel_stream *s)
73   {
74     data_stream = s;
75 
76     connection_open = true;
77 
78     return this;
79   }
80 
set_nproc(uint32_t anproc)81   void set_nproc (uint32_t anproc)
82   {
83     nproc = anproc;
84   }
85 
set_nlocaljobs(uint32_t anlocaljobs)86   void set_nlocaljobs (uint32_t anlocaljobs)
87   {
88     nlocaljobs = anlocaljobs;
89   }
90 
get_nproc(void)91   uint32_t get_nproc (void)
92   {
93     return nproc;
94   }
95 
get_nlocaljobs(void)96   uint32_t get_nlocaljobs (void)
97   {
98     return nlocaljobs;
99   }
100 
get_cmd_stream(void)101   octave_parallel_stream *get_cmd_stream (void) const
102   {
103     return cmd_stream;
104   }
105 
get_data_stream(void)106   octave_parallel_stream *get_data_stream (void) const
107   {
108     return data_stream;
109   }
110 
poll_for_errors(void)111   int poll_for_errors (void)
112   {
113     if (! cmd_stream)
114       {
115         _p_error ("can't poll since no command stream present");
116 
117         return -1;
118       }
119 
120     if (! cmd_stream->get_strbuf ()-> good ())
121       {
122         _p_error ("can't poll since command stream not good");
123 
124         return -1;
125       }
126 
127     int ret = 0;
128 
129     uint32_t err;
130     while (cmd_stream->get_strbuf ()->in_avail () > 0)
131       {
132         err = cmd_stream->network_recv_4byteint (err);
133 
134         if (! cmd_stream->good ())
135           {
136             _p_error ("error in reading error code");
137 
138             return -1;
139           }
140 
141         _p_error ("An error with code %u occurred in server %s.\nSee %s:/tmp/octave_error-<remote-host-name>_%s.log for details.",
142                err, peer_name.c_str (), peer_name.c_str (), uuid.c_str ());
143 
144         ret = 1;
145       }
146 
147     return ret;
148   }
149 
150   int wait_for_errors_or_data (void);
151 
get_peer_name(void)152   const std::string &get_peer_name (void)
153   {
154     return peer_name;
155   }
156 
~octave_parallel_connection(void)157   ~octave_parallel_connection (void)
158   {
159     dlprintf ("pconnection destructor called\n");
160     if (connection_open)
161       close ();
162 
163     if (cmd_stream)
164       {
165         dlprintf ("pconnection destructor will delete cmd_stream\n");
166         delete cmd_stream;
167       }
168 
169     if (data_stream)
170       {
171         dlprintf ("pconnection destructor will delete data_stream\n");
172         delete data_stream;
173       }
174   }
175 
delete_data_stream(void)176   int delete_data_stream (void)
177   {
178     if (data_stream)
179       {
180         data_stream->close ();
181 
182         if (data_stream->io_error ())
183           {
184             _p_error ("could not close data stream");
185 
186             return -1;
187           }
188 
189         delete data_stream;
190 
191         data_stream = NULL;
192       }
193 
194     return 0;
195   }
196 
close(void)197   int close (void)
198   {
199     int ret = 0;
200 
201     if (connection_open)
202       {
203         if (cmd_stream)
204           {
205             if (! server)
206               {
207                 poll_for_errors ();
208 
209                 cmd_stream->network_send_string
210                   ("sclose (sockets); __exit__;\n");
211               }
212 
213             cmd_stream->close ();
214 
215             if (cmd_stream->io_error ())
216               {
217                 _p_error ("could not close command stream");
218 
219                 ret = -1;
220               }
221           }
222 
223         if (data_stream)
224           {
225             data_stream->close ();
226 
227             if (data_stream->io_error ())
228               {
229                 _p_error ("could not close data stream");
230 
231                 ret = -1;
232               }
233           }
234 
235         connection_open = false;
236 
237         return ret;
238       }
239     else if (! pseudo_connection)
240       {
241         _p_error ("connection is not open");
242 
243         return -1;
244       }
245     else
246       return ret;
247   }
248 
is_pseudo_connection(void)249   bool is_pseudo_connection (void)
250   {
251     return pseudo_connection;
252   }
253 
is_server(void)254   bool is_server (void)
255   {
256     return server;
257   }
258 
connection_read_header(void)259   int connection_read_header (void)
260   {
261     return minimal_read_header (data_stream->get_istream (),
262                                 swap, flt_fmt);
263   }
264 
connection_read_data(octave_value & tc)265   int connection_read_data (octave_value &tc)
266   {
267     return minimal_read_data (data_stream->get_istream (), tc,
268                               swap, flt_fmt);
269   }
270 
is_open(void)271   bool is_open (void)
272   {
273     return connection_open;
274   }
275 
get_uuid(void)276   const std::string &get_uuid (void)
277   {
278     return uuid;
279   }
280 
281   int32_t peer_node_n; // -1 is unset, 0 is client
282 
283 private:
284 
285   octave_parallel_stream *cmd_stream;
286 
287   octave_parallel_stream *data_stream;
288 
289   bool connection_open;
290 
291   bool pseudo_connection; // for indicating own node in network
292 
293   std::string peer_name;
294 
295   // number of usable processor cores in peer, or in own machine if
296   // this is a pseudoconnection; num_processors() returns an unsigned
297   // long int, but there is no specific transefer function implemented
298   // here for long int, and such large numbers probably don't occur
299   // for nproc; supposed to be available (i.e. non-zero) for all
300   // connections in the clients network variable; not necessary in the
301   // servers network variables
302   uint32_t nproc;
303 
304   // configurable, this information can be used by the client to
305   // decide how many local processes should be spawned at the server;
306   // not necessary in the servers network variables
307   uint32_t nlocaljobs;
308 
309   bool server;
310 
311   std::string uuid;
312 
313   // for Octaves save and load functions
314   bool global;
315   bool swap;
316   OCTAVE__MACH_INFO::float_format flt_fmt;
317   std::string doc;
318 };
319 
320 class
321 octave_parallel_network
322 {
323 public:
324 
325   typedef Array<octave_parallel_connection *> connarray;
326 
octave_parallel_network(void)327   octave_parallel_network (void) : refcount (0), closed (false)
328   {
329 
330   }
331 
octave_parallel_network(int n)332   octave_parallel_network (int n)
333     : connections (dim_vector (n, 1), NULL), refcount (0), closed (false)
334   {
335 
336   }
337 
resize(int n)338   void resize (int n)
339   {
340     connections.resize1 (n, NULL);
341   }
342 
add_connection(octave_parallel_connection * connection)343   void add_connection (octave_parallel_connection *connection)
344   {
345     connections.resize1 (connections.numel () + 1, connection);
346   }
347 
insert_connection(octave_parallel_connection * connection,int idx)348   void insert_connection (octave_parallel_connection *connection, int idx)
349   {
350     connections(idx) = connection;
351 
352     connection->peer_node_n = idx;
353   }
354 
is_connection(int idx)355   bool is_connection (int idx)
356   {
357     return (bool) connections(idx);
358   }
359 
close(void)360   int close (void)
361   {
362     if (closed)
363       {
364         _p_error ("network already closed");
365 
366         return -1;
367       }
368     else
369       {
370         int ret = 0;
371 
372         for (int i = 0; i < connections.numel (); i++)
373           if (connections(i) && connections(i)->close ())
374             ret = -1;
375 
376         if (ret)
377           _p_error ("could not close network");
378         else
379           closed = true;
380 
381         return ret;
382       }
383   }
384 
~octave_parallel_network(void)385   ~octave_parallel_network (void)
386   {
387     dlprintf ("pnetwork destructor called ...\n");
388     for (int i = 0; i < connections.numel (); i++)
389       {
390         dlprintf ("... checks that connections(%i) is not NULL ...\n", i);
391         if (connections(i))
392           {
393             dlprintf ("... and deletes it.\n");
394             delete connections(i);
395           }
396       }
397   }
398 
get_ref(void)399   const connarray &get_ref (void)
400   {
401     refcount++;
402 
403     return connections;
404   }
405 
release_ref(void)406   int release_ref (void)
407   {
408     return --refcount;
409   }
410 
411 
is_closed(void)412   bool is_closed (void)
413   {
414     return closed;
415   }
416 
417 private:
418 
419   connarray connections;
420 
421   OCTAVE__REFCOUNT<int> refcount;
422 
423   bool closed;
424 };
425 
426 // This additional class is declared to circumvent const-ness of
427 // octave_value::get_rep() without a <const_cast>. It contains the
428 // members specific for the package, while the containing class
429 // octave_parallel_connections contains the stuff related to
430 // octave_base_value.
431 class
432 octave_parallel_connections_rep
433 {
434   friend class octave_parallel_connections;
435 
436 public:
437 
octave_parallel_connections_rep(octave_parallel_network * net,const char * auuid,bool i_am_server)438   octave_parallel_connections_rep (octave_parallel_network *net,
439                                    const char *auuid, bool i_am_server)
440     : network (net), connections (net->get_ref ()), uuid (auuid),
441       server (i_am_server), indexed (false)
442   {
443     subnet = connections;
444   }
445 
446   // for indexing
octave_parallel_connections_rep(octave_parallel_network * net,const char * auuid,bool i_am_server,octave_parallel_network::connarray & asubnet)447   octave_parallel_connections_rep (octave_parallel_network *net,
448                                    const char *auuid, bool i_am_server,
449                                    octave_parallel_network::connarray &asubnet)
450     : network (net), connections (net->get_ref ()), uuid (auuid),
451       server (i_am_server), indexed (true)
452   {
453     subnet = asubnet;
454   }
455 
~octave_parallel_connections_rep(void)456   ~octave_parallel_connections_rep (void)
457   {
458     dlprintf ("pconnections_rep destructor called, will request decr of network refcount and will check if it's <= 0 ...\n");
459     int tpdebug;
460     if ((tpdebug = network->release_ref ()) <= 0)
461       {
462         dlprintf ("... since it is %i, will delete network\n", tpdebug);
463         delete network;
464       }
465   }
466 
close(void)467   int close (void)
468   {
469     return network->close ();
470   }
471 
472   // function to return the _whole_ net (even if this is a subnet), to
473   // make it possible to construct a new octave_value with this
get_whole_network(void)474   octave_parallel_network *get_whole_network (void)
475   {
476     return network;
477   }
478 
479   // oct-functions work with this
get_connections(void)480   const octave_parallel_network::connarray &get_connections (void)
481   {
482     return subnet;
483   }
484 
is_server(void)485   bool is_server (void)
486   {
487     return server;
488   }
489 
get_all_connections(void)490   const octave_parallel_network::connarray &get_all_connections (void)
491   {
492     return connections;
493   }
494 
495 private:
496 
497   octave_parallel_network *network;
498 
499   const octave_parallel_network::connarray &connections;
500 
501   octave_parallel_network::connarray subnet;
502 
503   std::string uuid;
504 
505   bool server;
506 
507   bool indexed;
508 };
509 
510 class
511 octave_parallel_connections : public octave_base_value
512 {
513 public:
514 
octave_parallel_register_type(void)515   void octave_parallel_register_type (void)
516   {
517     static bool type_registered = false;
518 
519     if (! type_registered)
520       {
521         register_type ();
522 
523         type_registered = true;
524       }
525   }
526 
octave_parallel_connections(octave_parallel_network * net,const char * auuid,bool i_am_server)527   octave_parallel_connections (octave_parallel_network *net,
528                                const char *auuid, bool i_am_server)
529     : rep (new octave_parallel_connections_rep (net, auuid, i_am_server))
530   {
531     octave_parallel_register_type ();
532   }
533 
534   // for indexing
octave_parallel_connections(octave_parallel_network * net,const char * auuid,bool i_am_server,octave_parallel_network::connarray & asubnet)535   octave_parallel_connections (octave_parallel_network *net,
536                                const char *auuid, bool i_am_server,
537                                octave_parallel_network::connarray &asubnet)
538     : rep (new octave_parallel_connections_rep (net, auuid, i_am_server,
539                                                 asubnet))
540   {
541     octave_parallel_register_type ();
542   }
543 
~octave_parallel_connections(void)544   ~octave_parallel_connections (void)
545   {
546     dlprintf ("pconnections destructor called, deletes pconnections_rep\n");
547     delete rep;
548   }
549 
550   // Octave internal stuff
551 
552   octave_value do_index_op (const octave_value_list& idx,
553                             bool resize_ok = false);
554 
do_multi_index_op(int,const octave_value_list & idx)555   octave_value_list do_multi_index_op (int, const octave_value_list& idx)
556   {
557     return do_index_op (idx);
558   }
559 
560   // copied from ov-base-mat
subsref(const std::string & type,const std::list<octave_value_list> & idx)561   octave_value subsref (const std::string& type,
562                         const std::list<octave_value_list>& idx)
563   {
564     octave_value retval;
565 
566     switch (type[0])
567       {
568       case '(':
569         retval = do_index_op (idx.front ());
570         break;
571 
572       case '{':
573       case '.':
574         {
575           std::string nm = type_name ();
576           error ("%s cannot be indexed with %c", nm.c_str (), type[0]);
577         }
578         break;
579 
580       default:
581         panic_impossible ();
582       }
583 
584     return retval.next_subsref (type, idx);
585   }
586 
subsref(const std::string & type,const std::list<octave_value_list> & idx,int)587   octave_value_list subsref (const std::string& type,
588                              const std::list<octave_value_list>& idx, int)
589   {
590     return subsref (type, idx);
591   }
592 
is_constant(void)593   bool is_constant (void) const
594   {
595     return true;
596   }
597 
is_defined(void)598   bool is_defined (void) const
599   {
600     return true;
601   }
602 
is_true(void)603   bool is_true (void) const
604   {
605     return ! rep->network->is_closed ();
606   }
607 
dims(void)608   dim_vector dims (void) const
609   {
610     return rep->subnet.dims ();
611   }
612 
613   void print_raw (std::ostream& os, bool pr_as_read_syntax = false) const
614   {
615     octave_idx_type n = rep->connections.numel ();
616     octave_idx_type sn = rep->subnet.numel ();
617 
618     indent (os);
619 
620     os << "<parallel cluster connections object> ";
621     if (sn < n)
622       os << "subnet with " << sn << " of ";
623     os << n << " nodes";
624     newline (os);
625 
626     os << "network id: " << rep->uuid.c_str ();
627     newline (os);
628 
629     if (rep->network->is_closed ())
630       os << "----- closed -----";
631     else
632       os << "----- open -----";
633     newline (os);
634 
635     for (octave_idx_type i = 0; i < sn; i++)
636       {
637         os << rep->subnet(i)->peer_node_n << ") ";
638 
639         // if (rep->subnet(i)->is_server ())
640         //   os << "[server] ";
641         // else
642         //   os << "[client] ";
643 
644         if (rep->subnet(i)->is_pseudo_connection ())
645           os << "<local machine>";
646         else
647           os << rep->subnet(i)->get_peer_name ().c_str ();
648 
649         newline (os);
650       }
651   }
652 
653   void print (std::ostream& os, bool pr_as_read_syntax = false) const
654   {
655     print_raw (os);
656   }
657   // Octave changeset bcd71a2531d3 (Jan 31st 2014) made
658   // octave_base_value::print() non-const, after that this virtual
659   // function is not re-defined by the above print() function.  Having
660   // both const and non-const print() here seems to work both with
661   // Octave < and >= bcd71a2531d3 (print() is only called over the
662   // parent class virtual function).
663   void print (std::ostream& os, bool pr_as_read_syntax = false)
664   {
665     print_raw (os);
666   }
667 
print_as_scalar(void)668   bool print_as_scalar (void) const { return true; }
669 
get_rep(void)670   octave_parallel_connections_rep *get_rep (void) const
671   {
672     return rep;
673   }
674 
675 private:
676 
677   // needed by Octave for register_type(), also used here in indexing
octave_parallel_connections(void)678   octave_parallel_connections (void)
679   : rep (new octave_parallel_connections_rep
680          (new octave_parallel_network (), "", false))
681     {
682       dlprintf ("pconnections default ctor called\n");
683     }
684 
685   octave_parallel_connections_rep *rep;
686 
687   DECLARE_OV_TYPEID_FUNCTIONS_AND_DATA
688 };
689 
690 #endif // __OCT_PARALLEL_CONNECTION__
691 
692 
693 /*
694 ;;; Local Variables: ***
695 ;;; mode: C++ ***
696 ;;; End: ***
697 */
698