1 /*
2 
3 Copyright (C) 2002 Hayato Fujiwara <h_fujiwara@users.sourceforge.net>
4 Copyright (C) 2010-2020 Olaf Till <i7tiol@t-online.de>
5 
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10 
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 GNU General Public License for more details.
15 
16 You should have received a copy of the GNU General Public License
17 along with this program; If not, see <http://www.gnu.org/licenses/>.
18 
19 */
20 
21 // PKG_ADD: autoload ("__octave_server__", "parallel_interface.oct");
22 // PKG_DEL: autoload ("__octave_server__", "parallel_interface.oct", "remove");
23 
24 #include <octave/oct.h>
25 
26 #include "config.h"
27 
28 #include <octave/file-io.h>
29 #include <octave/sighandlers.h>
30 #include <octave/parse.h>
31 
32 #include <sys/socket.h>
33 #include <sys/types.h>
34 #include <sys/wait.h>
35 #include <iostream>
36 #include <sys/stat.h>
37 #include <sys/poll.h>
38 #include <errno.h>
39 #include <netdb.h>
40 #include <sys/utsname.h>
41 #include <netinet/in.h> // reported necessary for FreeBSD-8
42 
43 #if HAVE_UNISTD_H
44 #include <unistd.h>
45 #endif
46 
47 #include "parallel-gnutls.h"
48 
49 static
assert_file(std::string & path)50 int assert_file (std::string &path)
51 {
52   OCTAVE__SYS__FILE_STAT stat (path);
53 
54   if (! stat.is_reg ())
55     return -1;
56   else
57     return 0;
58 }
59 
60 void
reval_loop(octave_parallel_stream & cstr)61 reval_loop (octave_parallel_stream &cstr)
62 {
63   dsprintf ("reval, trying to write and read dummy\n");
64 
65   // send and read the final character of the initialization protocol
66   // to and from the client
67   char dummy = '\n';
68   cstr.get_ostream () << dummy;
69   dsprintf ("reval, wrote dummy (%i)\n", dummy);
70   cstr.get_istream () >> std::noskipws >> dummy;
71   dsprintf ("reval, read dummy (%i)\n", dummy);
72 
73   if (! cstr.good ())
74     {
75       _p_error ("could not finish initialization");
76       _exit (1);
77     }
78 
79   bool firsttime = true;
80 
81   dsprintf ("reval, entering loop\n");
82   while (true)
83     {
84       std::string s;
85 
86       if (firsttime)
87         {
88           dsprintf ("reval, setting command to 'rehash ()' at first repetition of loop\n");
89           s = "rehash ()\n"; // this avoided some problems
90           firsttime = false;
91         }
92       else
93         {
94           dsprintf ("reval loop, before network_recv_string\n");
95           if (cstr.network_recv_string (s))
96             {
97               _p_error ("error reading command");
98               _exit (1);
99             }
100           dsprintf ("reval loop, after successful network_recv_string\n");
101         }
102 
103       bool err;
104       int p_err;
105       SET_ERR (OCTAVE__EVAL_STRING (s, false, p_err, 0), err);
106       dsprintf ("reval loop, after evaluating string\n");
107 
108       uint32_t nl = 0;
109       if (err)
110         {
111           dsprintf ("reval loop, evaluation caused error\n");
112           nl = 1;
113         }
114       else if (p_err)
115         {
116           dsprintf ("reval loop, p_err was set\n");
117           nl = 1;
118         }
119       if (nl)
120         {
121           dsprintf ("reval loop, before sending error indication\n");
122           if (cstr.network_send_4byteint (nl, true))
123             {
124               _p_error ("error sending error code");
125               _exit (1);
126             }
127           dsprintf ("reval loop, after successful sending error indication\n");
128         }
129       else
130         {
131           dsprintf ("reval loop, no error, caring for Octave command number\n");
132           if (octave_completion_matches_called)
133             octave_completion_matches_called = false;
134           else
135             OCTAVE__COMMAND_EDITOR::increment_current_command_number ();
136           dsprintf ("reval loop, no error, after caring for Octave command number\n");
137         }
138     }
139 }
140 
141 DEFUN_DLD (__octave_server__, args, ,
142            "-*- texinfo -*-\n\
143 @deftypefn {Loadable Function} {} __octave_server__ ()\n\
144 Undocumented internal function.\n\
145 @end deftypefn")
146 {
147   // Contrary to what is done in the client side function connect.cc,
148   // it is not necessarily always explicitely cared here for closing
149   // of sockets and freeing of allocated memory in cases of
150   // error. Since this function exits or returns in case of an
151   // internal error, there is sometimes no need to do this. Returning
152   // (due to exceptions) should be as good as exiting, since the
153   // server is started in a way that leads to exiting Octave as soon
154   // as the server returns.
155   //
156   // Originally, it was thought that deallocation of anything which
157   // uses credentials has to be carefully provided here also, since
158   // the destructor of the ref-counted credentials must be called in
159   // the end to zero sensitive data before deallocation. This may not
160   // be true, since the OS should zero any memory page given to a new
161   // process.
162 
163   octave_value_list retval;
164 
165   std::string fname ("__octave_server__");
166 
167   dsprintf ("%s: just started\n", fname.c_str ());
168 
169   // arguments: asock, dsock, use_gnutls, cafile, hostname, server_no
170   if (args.length () != 6)
171     {
172       _p_error ("%s: six arguments required", fname.c_str ());
173       _exit (1);
174     }
175   // file descriptor for command connection (accepted socket)
176   int asock;
177   CHECK_ERROR_EXIT1 (asock = args(0).int_value (),
178                      "%s: could not convert argument 1 to int",
179                      fname.c_str ());
180   // file descriptor for data connection (socket in listening state)
181   int dsock;
182   CHECK_ERROR_EXIT1 (dsock = args(1).int_value (),
183                      "%s: could not convert argument 2 to int",
184                      fname.c_str ());
185   // use TLS
186   bool use_gnutls;
187   CHECK_ERROR_EXIT1 (use_gnutls = args(2).bool_value (),
188                      "%s: could not convert argument 3 to bool",
189                      fname.c_str ());
190   // custom authentication file
191   std::string cafile;
192   CHECK_ERROR_EXIT1 (cafile = args(3).string_value (),
193                      "%s: could not convert argument 4 to string",
194                      fname.c_str ());
195   // hostname
196   std::string hostname;
197   CHECK_ERROR_EXIT1 (hostname = args(4).string_value (),
198                      "%s: could not convert argument 5 to string",
199                      fname.c_str ());
200   // server number for debugging
201   int server_no;
202   CHECK_ERROR_EXIT1 (server_no = args(5).int_value (),
203                      "%s: could not convert argument 6 to int",
204                      fname.c_str ());
205 
206   // A negative integer might be sent as Octave data, and Octave
207   // doesn't care about coding of negative integers. (I know, there
208   // probably will never be a current C-compiler with something
209   // different than twos complement. But the C-standard allows for
210   // it.)
211   if (octave_parallel_stream::signed_int_rep ())
212     {
213       _p_error ("This machine doesn't seem to use twos complement as negative integer representation. If you want this to be supported, please file a bug report.");
214 
215       _exit (1);
216     }
217 
218   // avoid dumping octave_core if killed by a signal
219   OCTAVE__FEVAL ("sigterm_dumps_octave_core", octave_value (0), 0);
220   OCTAVE__FEVAL ("sighup_dumps_octave_core", octave_value (0), 0);
221 
222   dsprintf ("%i: Octave server before start of communication\n", server_no);
223 
224 #ifdef HAVE_LIBGNUTLS
225   struct __ccredguard
226   {
227     octave_parallel_gnutls_srp_client_credentials *__c;
__ccredguard__ccredguard228     __ccredguard (void) : __c (NULL) { }
~__ccredguard__ccredguard229     ~__ccredguard (void) { if (__c && ! __c->check_ref ()) delete __c; }
__get__ccredguard230     octave_parallel_gnutls_srp_client_credentials *__get (void)
231     { return __c; }
__set__ccredguard232     void __set (octave_parallel_gnutls_srp_client_credentials *__ic)
233     { __c = __ic; }
__release__ccredguard234     void __release (void) { __c = NULL; }
235   }  __ccg;
236   octave_parallel_gnutls_srp_client_credentials *ccred;
237 
238   struct __scredguard
239   {
240     octave_parallel_gnutls_srp_server_credentials *__c;
__scredguard__scredguard241     __scredguard (void) : __c (NULL) { }
~__scredguard__scredguard242     ~__scredguard (void) { if (__c && ! __c->check_ref ()) delete __c; }
__get__scredguard243     octave_parallel_gnutls_srp_server_credentials *__get (void)
244     { return __c; }
__set__scredguard245     void __set (octave_parallel_gnutls_srp_server_credentials *__ic)
246     { __c = __ic; }
__release__scredguard247     void __release (void) { __c = NULL; }
248   }  __scg;
249   octave_parallel_gnutls_srp_server_credentials *scred;
250 
251   if (use_gnutls)
252     {
253       gnutls_global_init ();
254 #ifdef HAVE_LIBGNUTLS_EXTRA
255       gnutls_global_init_extra ();  // for SRP
256       parallel_gnutls_set_mem_functions ();
257 #endif
258 
259       dsprintf ("%i: after initializing gnutls\n", server_no);
260 
261       // generate credentials
262       if (! cafile.length ())
263         {
264 #ifdef HAVE_OCTAVE_CONFIG_FCNS
265           std::string octave_home = octave::config::octave_home ();
266 #else
267           extern std::string Voctave_home;
268           std::string octave_home = Voctave_home;
269 #endif
270           cafile = octave_home +
271             OCTAVE__SYS__FILE_OPS::dir_sep_str () + "share" +
272             OCTAVE__SYS__FILE_OPS::dir_sep_str () + "octave" +
273             OCTAVE__SYS__FILE_OPS::dir_sep_str () +
274             "parallel-srp-data" +
275             OCTAVE__SYS__FILE_OPS::dir_sep_str () + "server" +
276             OCTAVE__SYS__FILE_OPS::dir_sep_str () + "passwd";
277           if (assert_file (cafile))
278             {
279               octave_value_list f_args (1);
280               f_args(0) = octave_value ("prefix");
281               octave_value_list f_ret;
282               CHECK_ERROR_EXIT1 (f_ret = OCTAVE__FEVAL ("pkg", f_args, 1),
283                                  "%s: could not get prefix from pkg",
284                                  fname.c_str ());
285               CHECK_ERROR_EXIT1 (cafile = f_ret(0).string_value (),
286                                  "%s: could not convert output of pkg ('prefix') to string)",
287                                  fname.c_str ());
288               cafile = cafile + OCTAVE__SYS__FILE_OPS::dir_sep_str () +
289                 "parallel-srp-data" +
290                 OCTAVE__SYS__FILE_OPS::dir_sep_str () + "server" +
291                 OCTAVE__SYS__FILE_OPS::dir_sep_str () + "passwd";
292               if (assert_file (cafile))
293                 {
294                   _p_error ("%s: no regular file found at default password file paths",
295                             fname.c_str ());
296                   _exit (1);
297                 }
298             }
299         }
300       else if (assert_file (cafile))
301         {
302           _p_error ("%s: no regular file found at password file path given by user",
303                     fname.c_str ());
304           _exit (1);
305         }
306       __scg.__set (scred =
307                    new octave_parallel_gnutls_srp_server_credentials
308                    (cafile));
309       dsprintf ("%i: after generating credentials\n", server_no);
310       if (! __scg.__get ()->check_cred ())
311         {
312           _p_error ("%s: could not create credentials",
313                     fname.c_str ());
314           _exit (1);
315         }
316     }
317 #endif // HAVE_LIBGNUTLS
318 
319   // determine own number of usable processor cores
320   uint32_t nproc = num_processors (NPROC_CURRENT);
321 
322   // The servers command stream will not be inserted into a
323   // connection object.
324   octave_parallel_streambuf *cmd_strb;
325 #ifdef HAVE_LIBGNUTLS
326   if (use_gnutls)
327     {
328       dsprintf ("%i: will generate gnutls streambuf\n", server_no);
329       cmd_strb = new octave_parallel_gnutls_streambuf
330         (asock, scred, true);
331       __scg.__release ();
332       dsprintf ("%i: generated gnutls streambuf\n", server_no);
333     }
334   else
335 #endif
336     cmd_strb = new octave_parallel_socket_streambuf (asock, true);
337   octave_parallel_stream cmd_str (cmd_strb);
338   if (! cmd_str.good ())
339     {
340       _p_error ("could not create command stream");
341       _exit (1);
342     }
343 
344   uint32_t nhosts;
345   cmd_str.network_recv_4byteint (nhosts);
346   dsprintf ("%i: received nhosts (%i), good: %i\n", server_no, nhosts, cmd_str.good ());
347 
348   cmd_str.network_send_4byteint (nproc, true);
349   dsprintf ("%i: sent nproc (%u), good: %i\n", server_no, nproc, cmd_str.good ());
350 
351   uint32_t me;
352   cmd_str.network_recv_4byteint (me);
353   dsprintf ("%i: received me (%i), good: %i\n", server_no, me, cmd_str.good ());
354 
355   std::string uuid;
356   cmd_str.network_recv_string (uuid);
357   dsprintf ("%i: received uuid (%s), good: %i\n", server_no, uuid.c_str (), cmd_str.good ());
358 
359   if (! cmd_str.good ()) // check before using the received 'nhosts'
360     {
361       _p_error ("communication error in initialization");
362       _exit (1);
363     }
364   Array<std::string> hosts (dim_vector (nhosts, 1));
365   for (uint32_t i = 0; i < nhosts; i++)
366     {
367       cmd_str.network_recv_string (hosts(i));
368       dsprintf ("%i: received hostname no %i (%s)\n", server_no, i, hosts(i).c_str ());
369     }
370 
371   dsprintf ("will now change name of error file\n");
372   std::string errname = std::string ("/tmp/octave_error-") + hostname.c_str ()
373     + "_" + uuid.c_str () + ".log";
374   struct stat s_stat;
375   if (stat (errname.c_str (), &s_stat) == 0)
376     {
377       std::string bakname ("/tmp/octave_error-");
378       bakname = bakname + hostname.c_str () +
379         "_" + uuid.c_str () + ".bak";
380       if (rename (errname.c_str (), bakname.c_str ()))
381         {
382           perror ("rename");
383           _exit (1);
384         }
385     }
386   if (! freopen (errname.c_str (), "w", stderr))
387     {
388       perror ("freopen");
389       _exit (1);
390     }
391 
392   std::string directory;
393   cmd_str.network_recv_string (directory);
394   dsprintf ("%i: received directory (%s)\n", server_no, directory.c_str ());
395 
396 #define BUFLEN 1024
397   struct __pwguard
398   {
399     char *__pw;
400     int __len;
__pwguard__pwguard401     __pwguard (int __alen) : __pw (new char[__alen]), __len (__alen) { }
__free__pwguard402     void __free (void)
403     {
404       if (__pw)
405         {
406           memset ((void *) __pw, 0, __len);
407           delete [] __pw;
408           __pw = NULL;
409         }
410     }
~__pwguard__pwguard411     ~__pwguard (void) { __free (); }
__get__pwguard412     char *__get (void) { return __pw; }
413   } __pwg (BUFLEN);
414   if (use_gnutls)
415     {
416       cmd_str.network_recv_string (__pwg.__get (), BUFLEN);
417       if (me == nhosts) // we won't need it
418         __pwg.__free ();
419       dsprintf ("%i: received password (%s)\n", server_no, __pwg.__get ());
420     }
421 
422   if (! cmd_str.good ())
423     {
424       _p_error ("communication error in initialization");
425       _exit (1);
426     }
427 
428   // client may shut down before starting data connections (if it was
429   // unable to establish all command connections)
430   struct pollfd pfd[2];
431   pfd[0].fd = asock;
432   pfd[0].events = POLLIN | POLLHUP; // POLLHUP meaningless here?
433   pfd[0].revents = 0;
434   pfd[1].fd = dsock;
435   pfd[1].events = POLLIN; // will be set if accept is possible
436   pfd[1].revents = 0;
437   if (poll (pfd, 2, -1) == -1)
438     {
439       _p_error ("error in poll()");
440       _exit (1);
441     }
442   if (pfd[0].revents)
443     {
444       _p_error ("unexpected event at command socket");
445       _exit (1);
446     }
447 
448   octave_parallel_network *network;
449   struct __netwguard
450   {
451     octave_parallel_network *__n;
__netwguard__netwguard452     __netwguard (octave_parallel_network *__an) : __n (__an)
453     { __n->get_ref (); }
~__netwguard__netwguard454     ~__netwguard (void) { if (__n->release_ref () <= 0) delete __n; }
455   } __ng (network = new octave_parallel_network (nhosts + 1));
456 
457   for (uint32_t i = 0; i < me; i++)
458     {
459       //      recv;
460 
461       dsprintf ("%i: trying to accept data connection, %i\n", server_no, i);
462       int not_connected = 1;
463       for (int j = 0; j < N_CONNECT_RETRIES; j++)
464         {
465           struct sockaddr_in rem_addr;
466           socklen_t addrlen = sizeof (rem_addr);
467           int adsock = accept (dsock, (sockaddr *) &rem_addr, &addrlen);
468           if (adsock == -1)
469             {
470               perror ("accept, data stream");
471               _exit (1);
472             }
473           dsprintf ("server %i, host %i, retry %i, accept successful\n", server_no, i, j);
474           if (addrlen > sizeof (rem_addr))
475             {
476               perror ("accept, data stream, address buffer to short");
477               _exit (1);
478             }
479           struct __sockguard
480           {
__sockguard__sockguard481             __sockguard (int __isock) { __sock = __isock; }
~__sockguard__sockguard482             ~__sockguard (void) { if (__sock > -1) close (__sock); }
__release__sockguard483             void __release (void) { __sock = -1; }
484             int __sock;
485           } __sockg (adsock);
486 #define HOSTNAMEBUFLEN 257
487           char peername[HOSTNAMEBUFLEN];
488           if (getnameinfo ((sockaddr *) &rem_addr, addrlen,
489                            (char *) peername, HOSTNAMEBUFLEN,
490                            NULL, 0, 0))
491             {
492               _p_error ("getnameinfo returned an error");
493               _exit (1);
494             }
495 
496           octave_parallel_connection *conn =
497             new octave_parallel_connection
498             (peername, true, uuid.c_str ());
499 
500           // you don't know the position to insert the connection
501           // (protecting it) yet, an exception may be thrown before
502           // you know it
503           struct __connguard
504           {
__connguard__connguard505             __connguard (octave_parallel_connection *__ipt)
506             { __pt = __ipt; }
~__connguard__connguard507             ~__connguard (void) { if (__pt) delete __pt; }
__set__connguard508             void __set (octave_parallel_connection *__ipt)
509             { __pt = __ipt; }
510             octave_parallel_connection *__pt;
__release__connguard511             void __release (void) { __pt = NULL; }
512           } __conng (conn);
513 
514 #ifdef HAVE_LIBGNUTLS
515           if (use_gnutls)
516             {
517               conn->insert_data_stream
518                 (new octave_parallel_stream
519                  (new octave_parallel_gnutls_streambuf
520                   (adsock, scred, true)));
521               dsprintf ("server %i, host %i, retry %i, generated gnutls streambuf\n", server_no, i, j);
522             }
523           else
524 #endif
525             conn->insert_data_stream
526               (new octave_parallel_stream
527                (new octave_parallel_socket_streambuf (adsock, true)));
528 
529           __sockg.__release ();
530 
531           if (! conn->get_data_stream ()->good ())
532             {
533               _p_error ("could not create data stream to %s", peername);
534               _exit (1);
535             }
536 
537           std::string duuid;
538           conn->get_data_stream ()->network_recv_string (duuid);
539           dsprintf ("server %i, host %i, retry %i, received uuid (%s)\n", server_no, i, j, duuid.c_str ());
540 
541           uint32_t host_n;
542           conn->get_data_stream ()->network_recv_4byteint (host_n);
543           dsprintf ("server %i, host %i, retry %i, received host_n (%i)\n", server_no, i, j, host_n);
544 
545           if (! conn->get_data_stream ()->good ())
546             {
547               _p_error ("communication error in initialization");
548               _exit (1);
549             }
550 
551           if (uuid.compare (duuid))
552             {
553               // a different call to 'connect', i.e. a different network
554               conn->get_data_stream ()->network_send_4byteint (-1);
555               if (conn->delete_data_stream ())
556                 {
557                   _p_error ("could not delete data stream");
558                   _exit (1);
559                 }
560               dsprintf ("server %i, host %i, retry %i, sent result -1\n", server_no, i, j);
561               sleep (1);
562             }
563           else if (me <= host_n || network->is_connection (host_n))
564             {
565               // we should never get here (since duuid == uuid)
566               conn->get_data_stream ()->network_send_4byteint (-2);
567               _p_error ("server %i, host %i, retry %i, internal error, unexpected host id %i, is_connection(host id): %i", server_no, i, j, host_n,
568                         network->is_connection (host_n));
569               _exit (1);
570             }
571           else
572             {
573               conn->get_data_stream ()->network_send_4byteint (0);
574               int err = conn->connection_read_header ();
575               minimal_write_header
576                 (conn->get_data_stream ()->get_ostream ());
577               if (err || ! conn->get_data_stream ()->good ())
578                 {
579                   _p_error ("communication error in initialization");
580                   _exit (1);
581                 }
582               network->insert_connection (conn, host_n);
583               __conng.__release ();
584               not_connected = 0;
585               dsprintf ("server %i, host %i, retry %i, good result sent, header read, header written and data stream good, breaking\n", server_no, i, j);
586               break;
587             }
588         }
589       if (not_connected)
590         {
591           _p_error ("maximum number of connect retries exceeded");
592           _exit (-1);
593         }
594     }
595 
596   close (dsock);
597 
598 
599   // a pseudo-connection, representing the own node in the network
600   octave_parallel_connection *conn =
601     new octave_parallel_connection (true, uuid.c_str ());
602 
603   network->insert_connection (conn, me);
604   dsprintf ("server %i inserted pseudoconnection at %i\n", server_no, me);
605 
606   // store number of available processor cores at the own machine,
607   // although this is not necessary (but we have this information
608   // here...)
609   conn->set_nproc (nproc);
610 
611 #ifdef HAVE_LIBGNUTLS
612   if (use_gnutls && me < nhosts)
613     {
614       const char *username =
615         static_cast<octave_parallel_gnutls_streambuf*>(cmd_strb)->
616         server_get_username ();
617       dsprintf ("server %i determined username %s from command stream, will now allocate client credentials (for data connections) with this username and password %s\n", server_no, username, __pwg.__get ());
618 
619       __ccg.__set (ccred =
620                    new octave_parallel_gnutls_srp_client_credentials
621                    (username, __pwg.__get ()));
622     }
623 #endif
624 
625 
626   for (uint32_t i = me + 1; i <= nhosts; i++)
627     {
628       // connect;
629       dsprintf ("connect, server %i, host %i\n", server_no, i);
630 
631       struct addrinfo *ai = NULL, hints;
632       memset ((void *) &hints, 0, sizeof (hints));
633       hints.ai_family = AF_INET;
634       hints.ai_socktype = SOCK_STREAM;
635       hints.ai_protocol = 0;
636       hints.ai_flags = 0;
637       if (getaddrinfo (hosts(i - 1).c_str (), "12501", &hints, &ai))
638         {
639           _p_error ("getaddrinfo returned an error");
640           _exit (1);
641         }
642       struct __aiguard
643       {
__aiguard__aiguard644         __aiguard (struct addrinfo *__iai) { __ai = __iai; }
~__aiguard__aiguard645         ~__aiguard (void) { if (__ai) freeaddrinfo (__ai); }
646         struct addrinfo *__ai;
647       } __aig (ai);
648 
649       int not_connected = 1;
650       for (int j = 0; j < N_CONNECT_RETRIES; j++)
651         {
652           int dsock = socket (PF_INET, SOCK_STREAM, 0);
653           if (dsock == -1)
654             {
655               perror ("socket");
656               _exit (1);
657             }
658           struct __sockguard
659           {
__sockguard__sockguard660             __sockguard (int __isock) { __sock = __isock; }
~__sockguard__sockguard661             ~__sockguard (void) { if (__sock > -1) close (__sock); }
__release__sockguard662             void __release (void) { __sock = -1; }
663             int __sock;
664           } __sockg (dsock);
665 
666           if (connect (dsock, ai->ai_addr, ai->ai_addrlen) == 0)
667             {
668               dsprintf ("connect, server %i, host %i, retry %i, connect succesful\n", server_no, i, j);
669               octave_parallel_connection *conn =
670                 new octave_parallel_connection
671                 (hosts(i - 1).c_str (), true, uuid.c_str ());
672               network->insert_connection (conn, i);
673 
674 #ifdef HAVE_LIBGNUTLS
675               if (use_gnutls)
676                 {
677                   conn->insert_data_stream
678                     (new octave_parallel_stream
679                      (new octave_parallel_gnutls_streambuf
680                       (dsock, ccred, false)));
681                   __ccg.__release ();
682                   dsprintf ("connect, server %i, host %i, retry %i, generated gnutls streambuf\n", server_no, i, j);
683                 }
684               else
685 #endif
686                 conn->insert_data_stream
687                   (new octave_parallel_stream
688                    (new octave_parallel_socket_streambuf (dsock, false)));
689               __sockg.__release ();
690               if (! conn->get_data_stream ()->good ())
691                 {
692                   _p_error ("could not create data stream to %s",
693                             hosts(i - 1).c_str ());
694                   _exit (1);
695                 }
696 
697               conn->get_data_stream ()->
698                 network_send_string (uuid.c_str ());
699               dsprintf ("connect, server %i, host %i, retry %i, uuid written (%s)\n", server_no, i, j, uuid.c_str ());
700 
701               conn->get_data_stream ()->network_send_4byteint (me,
702                                                                        true);
703               dsprintf ("connect, server %i, host %i, retry %i, 'me' written (%i)\n", server_no, i, j, me);
704 
705               int32_t res;
706               conn->get_data_stream ()->network_recv_4byteint (res);
707 
708               if (! conn->get_data_stream ()->good ())
709                 {
710                   _p_error ("communication error in initialization");
711                   _exit (1);
712                 }
713 
714               if (res == -1)
715                 {
716                   if (conn->delete_data_stream ())
717                     {
718                       _p_error ("could not delete data stream");
719                       _exit (1);
720                     }
721 
722                   dsprintf ("connect, server %i, host %i, retry %i, sleeping after bad result\n", server_no, i, j);
723                   usleep (5000);
724                 }
725               else if (res)
726                 {
727                   _p_error ("unexpected error in remote server");
728                   _exit (1);
729                 }
730               else
731                 {
732                   minimal_write_header
733                     (conn->get_data_stream ()->get_ostream ());
734                   if (conn->connection_read_header () ||
735                       ! conn->get_data_stream ()->good ())
736                     {
737                       _p_error ("communication error in initialization");
738                       _exit (1);
739                     }
740                   not_connected = 0;
741                   dsprintf ("connect, server %i, host %i, retry %i, good result read, header written and read and datastream good, breaking\n", server_no, i, j);
742                   break;
743                 }
744             }
745           else if (errno != ECONNREFUSED && errno != EINTR)
746             {
747               perror ("connect");
748               break;
749             }
750           else
751             usleep (5000);
752         }
753 
754       if (not_connected)
755         {
756           _p_error ("unable to connect to %s", hosts(i - 1).c_str ());
757           _exit (1);
758         }
759     }
760 
761   octave_parallel_connections *conns = new octave_parallel_connections
762     (network, uuid.c_str (), true);
763   octave_value sockets (conns);
764 
765   __pwg.__free ();
766 
767 #ifdef HAVE_OCTAVE_INTERPRETER_H
768   OCTAVE__INTERPRETER::the_interpreter () -> interactive (false);
769 #else
770   interactive = false;
771 #endif
772 
773   // install 'sockets' as Octave variable
774   OCTAVE__INTERPRETER__SYMBOL_TABLE__ASSIGN ("sockets", sockets);
775   dsprintf ("'sockets' installed\n");
776 
777   int cd_ok = OCTAVE__SYS__ENV::chdir (directory.c_str ());
778   if (! cd_ok)
779     {
780       OCTAVE__SYS__ENV::chdir ("/tmp");
781       dsprintf ("performed chdir to /tmp\n");
782     }
783   else
784     dsprintf ("performed chdir to %s\n", directory.c_str ());
785 
786   dsprintf ("calling function reval_loop\n");
787   reval_loop (cmd_str); // does not return
788 
789   return retval;
790 }
791