1 /*
2
3 Copyright (C) 2002 Hayato Fujiwara <h_fujiwara@users.sourceforge.net>
4 Copyright (C) 2010-2020 Olaf Till <i7tiol@t-online.de>
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 // PKG_ADD: autoload ("__octave_server__", "parallel_interface.oct");
22 // PKG_DEL: autoload ("__octave_server__", "parallel_interface.oct", "remove");
23
24 #include <octave/oct.h>
25
26 #include "config.h"
27
28 #include <octave/file-io.h>
29 #include <octave/sighandlers.h>
30 #include <octave/parse.h>
31
32 #include <sys/socket.h>
33 #include <sys/types.h>
34 #include <sys/wait.h>
35 #include <iostream>
36 #include <sys/stat.h>
37 #include <sys/poll.h>
38 #include <errno.h>
39 #include <netdb.h>
40 #include <sys/utsname.h>
41 #include <netinet/in.h> // reported necessary for FreeBSD-8
42
43 #if HAVE_UNISTD_H
44 #include <unistd.h>
45 #endif
46
47 #include "parallel-gnutls.h"
48
49 static
assert_file(std::string & path)50 int assert_file (std::string &path)
51 {
52 OCTAVE__SYS__FILE_STAT stat (path);
53
54 if (! stat.is_reg ())
55 return -1;
56 else
57 return 0;
58 }
59
60 void
reval_loop(octave_parallel_stream & cstr)61 reval_loop (octave_parallel_stream &cstr)
62 {
63 dsprintf ("reval, trying to write and read dummy\n");
64
65 // send and read the final character of the initialization protocol
66 // to and from the client
67 char dummy = '\n';
68 cstr.get_ostream () << dummy;
69 dsprintf ("reval, wrote dummy (%i)\n", dummy);
70 cstr.get_istream () >> std::noskipws >> dummy;
71 dsprintf ("reval, read dummy (%i)\n", dummy);
72
73 if (! cstr.good ())
74 {
75 _p_error ("could not finish initialization");
76 _exit (1);
77 }
78
79 bool firsttime = true;
80
81 dsprintf ("reval, entering loop\n");
82 while (true)
83 {
84 std::string s;
85
86 if (firsttime)
87 {
88 dsprintf ("reval, setting command to 'rehash ()' at first repetition of loop\n");
89 s = "rehash ()\n"; // this avoided some problems
90 firsttime = false;
91 }
92 else
93 {
94 dsprintf ("reval loop, before network_recv_string\n");
95 if (cstr.network_recv_string (s))
96 {
97 _p_error ("error reading command");
98 _exit (1);
99 }
100 dsprintf ("reval loop, after successful network_recv_string\n");
101 }
102
103 bool err;
104 int p_err;
105 SET_ERR (OCTAVE__EVAL_STRING (s, false, p_err, 0), err);
106 dsprintf ("reval loop, after evaluating string\n");
107
108 uint32_t nl = 0;
109 if (err)
110 {
111 dsprintf ("reval loop, evaluation caused error\n");
112 nl = 1;
113 }
114 else if (p_err)
115 {
116 dsprintf ("reval loop, p_err was set\n");
117 nl = 1;
118 }
119 if (nl)
120 {
121 dsprintf ("reval loop, before sending error indication\n");
122 if (cstr.network_send_4byteint (nl, true))
123 {
124 _p_error ("error sending error code");
125 _exit (1);
126 }
127 dsprintf ("reval loop, after successful sending error indication\n");
128 }
129 else
130 {
131 dsprintf ("reval loop, no error, caring for Octave command number\n");
132 if (octave_completion_matches_called)
133 octave_completion_matches_called = false;
134 else
135 OCTAVE__COMMAND_EDITOR::increment_current_command_number ();
136 dsprintf ("reval loop, no error, after caring for Octave command number\n");
137 }
138 }
139 }
140
141 DEFUN_DLD (__octave_server__, args, ,
142 "-*- texinfo -*-\n\
143 @deftypefn {Loadable Function} {} __octave_server__ ()\n\
144 Undocumented internal function.\n\
145 @end deftypefn")
146 {
147 // Contrary to what is done in the client side function connect.cc,
148 // it is not necessarily always explicitely cared here for closing
149 // of sockets and freeing of allocated memory in cases of
150 // error. Since this function exits or returns in case of an
151 // internal error, there is sometimes no need to do this. Returning
152 // (due to exceptions) should be as good as exiting, since the
153 // server is started in a way that leads to exiting Octave as soon
154 // as the server returns.
155 //
156 // Originally, it was thought that deallocation of anything which
157 // uses credentials has to be carefully provided here also, since
158 // the destructor of the ref-counted credentials must be called in
159 // the end to zero sensitive data before deallocation. This may not
160 // be true, since the OS should zero any memory page given to a new
161 // process.
162
163 octave_value_list retval;
164
165 std::string fname ("__octave_server__");
166
167 dsprintf ("%s: just started\n", fname.c_str ());
168
169 // arguments: asock, dsock, use_gnutls, cafile, hostname, server_no
170 if (args.length () != 6)
171 {
172 _p_error ("%s: six arguments required", fname.c_str ());
173 _exit (1);
174 }
175 // file descriptor for command connection (accepted socket)
176 int asock;
177 CHECK_ERROR_EXIT1 (asock = args(0).int_value (),
178 "%s: could not convert argument 1 to int",
179 fname.c_str ());
180 // file descriptor for data connection (socket in listening state)
181 int dsock;
182 CHECK_ERROR_EXIT1 (dsock = args(1).int_value (),
183 "%s: could not convert argument 2 to int",
184 fname.c_str ());
185 // use TLS
186 bool use_gnutls;
187 CHECK_ERROR_EXIT1 (use_gnutls = args(2).bool_value (),
188 "%s: could not convert argument 3 to bool",
189 fname.c_str ());
190 // custom authentication file
191 std::string cafile;
192 CHECK_ERROR_EXIT1 (cafile = args(3).string_value (),
193 "%s: could not convert argument 4 to string",
194 fname.c_str ());
195 // hostname
196 std::string hostname;
197 CHECK_ERROR_EXIT1 (hostname = args(4).string_value (),
198 "%s: could not convert argument 5 to string",
199 fname.c_str ());
200 // server number for debugging
201 int server_no;
202 CHECK_ERROR_EXIT1 (server_no = args(5).int_value (),
203 "%s: could not convert argument 6 to int",
204 fname.c_str ());
205
206 // A negative integer might be sent as Octave data, and Octave
207 // doesn't care about coding of negative integers. (I know, there
208 // probably will never be a current C-compiler with something
209 // different than twos complement. But the C-standard allows for
210 // it.)
211 if (octave_parallel_stream::signed_int_rep ())
212 {
213 _p_error ("This machine doesn't seem to use twos complement as negative integer representation. If you want this to be supported, please file a bug report.");
214
215 _exit (1);
216 }
217
218 // avoid dumping octave_core if killed by a signal
219 OCTAVE__FEVAL ("sigterm_dumps_octave_core", octave_value (0), 0);
220 OCTAVE__FEVAL ("sighup_dumps_octave_core", octave_value (0), 0);
221
222 dsprintf ("%i: Octave server before start of communication\n", server_no);
223
224 #ifdef HAVE_LIBGNUTLS
225 struct __ccredguard
226 {
227 octave_parallel_gnutls_srp_client_credentials *__c;
__ccredguard__ccredguard228 __ccredguard (void) : __c (NULL) { }
~__ccredguard__ccredguard229 ~__ccredguard (void) { if (__c && ! __c->check_ref ()) delete __c; }
__get__ccredguard230 octave_parallel_gnutls_srp_client_credentials *__get (void)
231 { return __c; }
__set__ccredguard232 void __set (octave_parallel_gnutls_srp_client_credentials *__ic)
233 { __c = __ic; }
__release__ccredguard234 void __release (void) { __c = NULL; }
235 } __ccg;
236 octave_parallel_gnutls_srp_client_credentials *ccred;
237
238 struct __scredguard
239 {
240 octave_parallel_gnutls_srp_server_credentials *__c;
__scredguard__scredguard241 __scredguard (void) : __c (NULL) { }
~__scredguard__scredguard242 ~__scredguard (void) { if (__c && ! __c->check_ref ()) delete __c; }
__get__scredguard243 octave_parallel_gnutls_srp_server_credentials *__get (void)
244 { return __c; }
__set__scredguard245 void __set (octave_parallel_gnutls_srp_server_credentials *__ic)
246 { __c = __ic; }
__release__scredguard247 void __release (void) { __c = NULL; }
248 } __scg;
249 octave_parallel_gnutls_srp_server_credentials *scred;
250
251 if (use_gnutls)
252 {
253 gnutls_global_init ();
254 #ifdef HAVE_LIBGNUTLS_EXTRA
255 gnutls_global_init_extra (); // for SRP
256 parallel_gnutls_set_mem_functions ();
257 #endif
258
259 dsprintf ("%i: after initializing gnutls\n", server_no);
260
261 // generate credentials
262 if (! cafile.length ())
263 {
264 #ifdef HAVE_OCTAVE_CONFIG_FCNS
265 std::string octave_home = octave::config::octave_home ();
266 #else
267 extern std::string Voctave_home;
268 std::string octave_home = Voctave_home;
269 #endif
270 cafile = octave_home +
271 OCTAVE__SYS__FILE_OPS::dir_sep_str () + "share" +
272 OCTAVE__SYS__FILE_OPS::dir_sep_str () + "octave" +
273 OCTAVE__SYS__FILE_OPS::dir_sep_str () +
274 "parallel-srp-data" +
275 OCTAVE__SYS__FILE_OPS::dir_sep_str () + "server" +
276 OCTAVE__SYS__FILE_OPS::dir_sep_str () + "passwd";
277 if (assert_file (cafile))
278 {
279 octave_value_list f_args (1);
280 f_args(0) = octave_value ("prefix");
281 octave_value_list f_ret;
282 CHECK_ERROR_EXIT1 (f_ret = OCTAVE__FEVAL ("pkg", f_args, 1),
283 "%s: could not get prefix from pkg",
284 fname.c_str ());
285 CHECK_ERROR_EXIT1 (cafile = f_ret(0).string_value (),
286 "%s: could not convert output of pkg ('prefix') to string)",
287 fname.c_str ());
288 cafile = cafile + OCTAVE__SYS__FILE_OPS::dir_sep_str () +
289 "parallel-srp-data" +
290 OCTAVE__SYS__FILE_OPS::dir_sep_str () + "server" +
291 OCTAVE__SYS__FILE_OPS::dir_sep_str () + "passwd";
292 if (assert_file (cafile))
293 {
294 _p_error ("%s: no regular file found at default password file paths",
295 fname.c_str ());
296 _exit (1);
297 }
298 }
299 }
300 else if (assert_file (cafile))
301 {
302 _p_error ("%s: no regular file found at password file path given by user",
303 fname.c_str ());
304 _exit (1);
305 }
306 __scg.__set (scred =
307 new octave_parallel_gnutls_srp_server_credentials
308 (cafile));
309 dsprintf ("%i: after generating credentials\n", server_no);
310 if (! __scg.__get ()->check_cred ())
311 {
312 _p_error ("%s: could not create credentials",
313 fname.c_str ());
314 _exit (1);
315 }
316 }
317 #endif // HAVE_LIBGNUTLS
318
319 // determine own number of usable processor cores
320 uint32_t nproc = num_processors (NPROC_CURRENT);
321
322 // The servers command stream will not be inserted into a
323 // connection object.
324 octave_parallel_streambuf *cmd_strb;
325 #ifdef HAVE_LIBGNUTLS
326 if (use_gnutls)
327 {
328 dsprintf ("%i: will generate gnutls streambuf\n", server_no);
329 cmd_strb = new octave_parallel_gnutls_streambuf
330 (asock, scred, true);
331 __scg.__release ();
332 dsprintf ("%i: generated gnutls streambuf\n", server_no);
333 }
334 else
335 #endif
336 cmd_strb = new octave_parallel_socket_streambuf (asock, true);
337 octave_parallel_stream cmd_str (cmd_strb);
338 if (! cmd_str.good ())
339 {
340 _p_error ("could not create command stream");
341 _exit (1);
342 }
343
344 uint32_t nhosts;
345 cmd_str.network_recv_4byteint (nhosts);
346 dsprintf ("%i: received nhosts (%i), good: %i\n", server_no, nhosts, cmd_str.good ());
347
348 cmd_str.network_send_4byteint (nproc, true);
349 dsprintf ("%i: sent nproc (%u), good: %i\n", server_no, nproc, cmd_str.good ());
350
351 uint32_t me;
352 cmd_str.network_recv_4byteint (me);
353 dsprintf ("%i: received me (%i), good: %i\n", server_no, me, cmd_str.good ());
354
355 std::string uuid;
356 cmd_str.network_recv_string (uuid);
357 dsprintf ("%i: received uuid (%s), good: %i\n", server_no, uuid.c_str (), cmd_str.good ());
358
359 if (! cmd_str.good ()) // check before using the received 'nhosts'
360 {
361 _p_error ("communication error in initialization");
362 _exit (1);
363 }
364 Array<std::string> hosts (dim_vector (nhosts, 1));
365 for (uint32_t i = 0; i < nhosts; i++)
366 {
367 cmd_str.network_recv_string (hosts(i));
368 dsprintf ("%i: received hostname no %i (%s)\n", server_no, i, hosts(i).c_str ());
369 }
370
371 dsprintf ("will now change name of error file\n");
372 std::string errname = std::string ("/tmp/octave_error-") + hostname.c_str ()
373 + "_" + uuid.c_str () + ".log";
374 struct stat s_stat;
375 if (stat (errname.c_str (), &s_stat) == 0)
376 {
377 std::string bakname ("/tmp/octave_error-");
378 bakname = bakname + hostname.c_str () +
379 "_" + uuid.c_str () + ".bak";
380 if (rename (errname.c_str (), bakname.c_str ()))
381 {
382 perror ("rename");
383 _exit (1);
384 }
385 }
386 if (! freopen (errname.c_str (), "w", stderr))
387 {
388 perror ("freopen");
389 _exit (1);
390 }
391
392 std::string directory;
393 cmd_str.network_recv_string (directory);
394 dsprintf ("%i: received directory (%s)\n", server_no, directory.c_str ());
395
396 #define BUFLEN 1024
397 struct __pwguard
398 {
399 char *__pw;
400 int __len;
__pwguard__pwguard401 __pwguard (int __alen) : __pw (new char[__alen]), __len (__alen) { }
__free__pwguard402 void __free (void)
403 {
404 if (__pw)
405 {
406 memset ((void *) __pw, 0, __len);
407 delete [] __pw;
408 __pw = NULL;
409 }
410 }
~__pwguard__pwguard411 ~__pwguard (void) { __free (); }
__get__pwguard412 char *__get (void) { return __pw; }
413 } __pwg (BUFLEN);
414 if (use_gnutls)
415 {
416 cmd_str.network_recv_string (__pwg.__get (), BUFLEN);
417 if (me == nhosts) // we won't need it
418 __pwg.__free ();
419 dsprintf ("%i: received password (%s)\n", server_no, __pwg.__get ());
420 }
421
422 if (! cmd_str.good ())
423 {
424 _p_error ("communication error in initialization");
425 _exit (1);
426 }
427
428 // client may shut down before starting data connections (if it was
429 // unable to establish all command connections)
430 struct pollfd pfd[2];
431 pfd[0].fd = asock;
432 pfd[0].events = POLLIN | POLLHUP; // POLLHUP meaningless here?
433 pfd[0].revents = 0;
434 pfd[1].fd = dsock;
435 pfd[1].events = POLLIN; // will be set if accept is possible
436 pfd[1].revents = 0;
437 if (poll (pfd, 2, -1) == -1)
438 {
439 _p_error ("error in poll()");
440 _exit (1);
441 }
442 if (pfd[0].revents)
443 {
444 _p_error ("unexpected event at command socket");
445 _exit (1);
446 }
447
448 octave_parallel_network *network;
449 struct __netwguard
450 {
451 octave_parallel_network *__n;
__netwguard__netwguard452 __netwguard (octave_parallel_network *__an) : __n (__an)
453 { __n->get_ref (); }
~__netwguard__netwguard454 ~__netwguard (void) { if (__n->release_ref () <= 0) delete __n; }
455 } __ng (network = new octave_parallel_network (nhosts + 1));
456
457 for (uint32_t i = 0; i < me; i++)
458 {
459 // recv;
460
461 dsprintf ("%i: trying to accept data connection, %i\n", server_no, i);
462 int not_connected = 1;
463 for (int j = 0; j < N_CONNECT_RETRIES; j++)
464 {
465 struct sockaddr_in rem_addr;
466 socklen_t addrlen = sizeof (rem_addr);
467 int adsock = accept (dsock, (sockaddr *) &rem_addr, &addrlen);
468 if (adsock == -1)
469 {
470 perror ("accept, data stream");
471 _exit (1);
472 }
473 dsprintf ("server %i, host %i, retry %i, accept successful\n", server_no, i, j);
474 if (addrlen > sizeof (rem_addr))
475 {
476 perror ("accept, data stream, address buffer to short");
477 _exit (1);
478 }
479 struct __sockguard
480 {
__sockguard__sockguard481 __sockguard (int __isock) { __sock = __isock; }
~__sockguard__sockguard482 ~__sockguard (void) { if (__sock > -1) close (__sock); }
__release__sockguard483 void __release (void) { __sock = -1; }
484 int __sock;
485 } __sockg (adsock);
486 #define HOSTNAMEBUFLEN 257
487 char peername[HOSTNAMEBUFLEN];
488 if (getnameinfo ((sockaddr *) &rem_addr, addrlen,
489 (char *) peername, HOSTNAMEBUFLEN,
490 NULL, 0, 0))
491 {
492 _p_error ("getnameinfo returned an error");
493 _exit (1);
494 }
495
496 octave_parallel_connection *conn =
497 new octave_parallel_connection
498 (peername, true, uuid.c_str ());
499
500 // you don't know the position to insert the connection
501 // (protecting it) yet, an exception may be thrown before
502 // you know it
503 struct __connguard
504 {
__connguard__connguard505 __connguard (octave_parallel_connection *__ipt)
506 { __pt = __ipt; }
~__connguard__connguard507 ~__connguard (void) { if (__pt) delete __pt; }
__set__connguard508 void __set (octave_parallel_connection *__ipt)
509 { __pt = __ipt; }
510 octave_parallel_connection *__pt;
__release__connguard511 void __release (void) { __pt = NULL; }
512 } __conng (conn);
513
514 #ifdef HAVE_LIBGNUTLS
515 if (use_gnutls)
516 {
517 conn->insert_data_stream
518 (new octave_parallel_stream
519 (new octave_parallel_gnutls_streambuf
520 (adsock, scred, true)));
521 dsprintf ("server %i, host %i, retry %i, generated gnutls streambuf\n", server_no, i, j);
522 }
523 else
524 #endif
525 conn->insert_data_stream
526 (new octave_parallel_stream
527 (new octave_parallel_socket_streambuf (adsock, true)));
528
529 __sockg.__release ();
530
531 if (! conn->get_data_stream ()->good ())
532 {
533 _p_error ("could not create data stream to %s", peername);
534 _exit (1);
535 }
536
537 std::string duuid;
538 conn->get_data_stream ()->network_recv_string (duuid);
539 dsprintf ("server %i, host %i, retry %i, received uuid (%s)\n", server_no, i, j, duuid.c_str ());
540
541 uint32_t host_n;
542 conn->get_data_stream ()->network_recv_4byteint (host_n);
543 dsprintf ("server %i, host %i, retry %i, received host_n (%i)\n", server_no, i, j, host_n);
544
545 if (! conn->get_data_stream ()->good ())
546 {
547 _p_error ("communication error in initialization");
548 _exit (1);
549 }
550
551 if (uuid.compare (duuid))
552 {
553 // a different call to 'connect', i.e. a different network
554 conn->get_data_stream ()->network_send_4byteint (-1);
555 if (conn->delete_data_stream ())
556 {
557 _p_error ("could not delete data stream");
558 _exit (1);
559 }
560 dsprintf ("server %i, host %i, retry %i, sent result -1\n", server_no, i, j);
561 sleep (1);
562 }
563 else if (me <= host_n || network->is_connection (host_n))
564 {
565 // we should never get here (since duuid == uuid)
566 conn->get_data_stream ()->network_send_4byteint (-2);
567 _p_error ("server %i, host %i, retry %i, internal error, unexpected host id %i, is_connection(host id): %i", server_no, i, j, host_n,
568 network->is_connection (host_n));
569 _exit (1);
570 }
571 else
572 {
573 conn->get_data_stream ()->network_send_4byteint (0);
574 int err = conn->connection_read_header ();
575 minimal_write_header
576 (conn->get_data_stream ()->get_ostream ());
577 if (err || ! conn->get_data_stream ()->good ())
578 {
579 _p_error ("communication error in initialization");
580 _exit (1);
581 }
582 network->insert_connection (conn, host_n);
583 __conng.__release ();
584 not_connected = 0;
585 dsprintf ("server %i, host %i, retry %i, good result sent, header read, header written and data stream good, breaking\n", server_no, i, j);
586 break;
587 }
588 }
589 if (not_connected)
590 {
591 _p_error ("maximum number of connect retries exceeded");
592 _exit (-1);
593 }
594 }
595
596 close (dsock);
597
598
599 // a pseudo-connection, representing the own node in the network
600 octave_parallel_connection *conn =
601 new octave_parallel_connection (true, uuid.c_str ());
602
603 network->insert_connection (conn, me);
604 dsprintf ("server %i inserted pseudoconnection at %i\n", server_no, me);
605
606 // store number of available processor cores at the own machine,
607 // although this is not necessary (but we have this information
608 // here...)
609 conn->set_nproc (nproc);
610
611 #ifdef HAVE_LIBGNUTLS
612 if (use_gnutls && me < nhosts)
613 {
614 const char *username =
615 static_cast<octave_parallel_gnutls_streambuf*>(cmd_strb)->
616 server_get_username ();
617 dsprintf ("server %i determined username %s from command stream, will now allocate client credentials (for data connections) with this username and password %s\n", server_no, username, __pwg.__get ());
618
619 __ccg.__set (ccred =
620 new octave_parallel_gnutls_srp_client_credentials
621 (username, __pwg.__get ()));
622 }
623 #endif
624
625
626 for (uint32_t i = me + 1; i <= nhosts; i++)
627 {
628 // connect;
629 dsprintf ("connect, server %i, host %i\n", server_no, i);
630
631 struct addrinfo *ai = NULL, hints;
632 memset ((void *) &hints, 0, sizeof (hints));
633 hints.ai_family = AF_INET;
634 hints.ai_socktype = SOCK_STREAM;
635 hints.ai_protocol = 0;
636 hints.ai_flags = 0;
637 if (getaddrinfo (hosts(i - 1).c_str (), "12501", &hints, &ai))
638 {
639 _p_error ("getaddrinfo returned an error");
640 _exit (1);
641 }
642 struct __aiguard
643 {
__aiguard__aiguard644 __aiguard (struct addrinfo *__iai) { __ai = __iai; }
~__aiguard__aiguard645 ~__aiguard (void) { if (__ai) freeaddrinfo (__ai); }
646 struct addrinfo *__ai;
647 } __aig (ai);
648
649 int not_connected = 1;
650 for (int j = 0; j < N_CONNECT_RETRIES; j++)
651 {
652 int dsock = socket (PF_INET, SOCK_STREAM, 0);
653 if (dsock == -1)
654 {
655 perror ("socket");
656 _exit (1);
657 }
658 struct __sockguard
659 {
__sockguard__sockguard660 __sockguard (int __isock) { __sock = __isock; }
~__sockguard__sockguard661 ~__sockguard (void) { if (__sock > -1) close (__sock); }
__release__sockguard662 void __release (void) { __sock = -1; }
663 int __sock;
664 } __sockg (dsock);
665
666 if (connect (dsock, ai->ai_addr, ai->ai_addrlen) == 0)
667 {
668 dsprintf ("connect, server %i, host %i, retry %i, connect succesful\n", server_no, i, j);
669 octave_parallel_connection *conn =
670 new octave_parallel_connection
671 (hosts(i - 1).c_str (), true, uuid.c_str ());
672 network->insert_connection (conn, i);
673
674 #ifdef HAVE_LIBGNUTLS
675 if (use_gnutls)
676 {
677 conn->insert_data_stream
678 (new octave_parallel_stream
679 (new octave_parallel_gnutls_streambuf
680 (dsock, ccred, false)));
681 __ccg.__release ();
682 dsprintf ("connect, server %i, host %i, retry %i, generated gnutls streambuf\n", server_no, i, j);
683 }
684 else
685 #endif
686 conn->insert_data_stream
687 (new octave_parallel_stream
688 (new octave_parallel_socket_streambuf (dsock, false)));
689 __sockg.__release ();
690 if (! conn->get_data_stream ()->good ())
691 {
692 _p_error ("could not create data stream to %s",
693 hosts(i - 1).c_str ());
694 _exit (1);
695 }
696
697 conn->get_data_stream ()->
698 network_send_string (uuid.c_str ());
699 dsprintf ("connect, server %i, host %i, retry %i, uuid written (%s)\n", server_no, i, j, uuid.c_str ());
700
701 conn->get_data_stream ()->network_send_4byteint (me,
702 true);
703 dsprintf ("connect, server %i, host %i, retry %i, 'me' written (%i)\n", server_no, i, j, me);
704
705 int32_t res;
706 conn->get_data_stream ()->network_recv_4byteint (res);
707
708 if (! conn->get_data_stream ()->good ())
709 {
710 _p_error ("communication error in initialization");
711 _exit (1);
712 }
713
714 if (res == -1)
715 {
716 if (conn->delete_data_stream ())
717 {
718 _p_error ("could not delete data stream");
719 _exit (1);
720 }
721
722 dsprintf ("connect, server %i, host %i, retry %i, sleeping after bad result\n", server_no, i, j);
723 usleep (5000);
724 }
725 else if (res)
726 {
727 _p_error ("unexpected error in remote server");
728 _exit (1);
729 }
730 else
731 {
732 minimal_write_header
733 (conn->get_data_stream ()->get_ostream ());
734 if (conn->connection_read_header () ||
735 ! conn->get_data_stream ()->good ())
736 {
737 _p_error ("communication error in initialization");
738 _exit (1);
739 }
740 not_connected = 0;
741 dsprintf ("connect, server %i, host %i, retry %i, good result read, header written and read and datastream good, breaking\n", server_no, i, j);
742 break;
743 }
744 }
745 else if (errno != ECONNREFUSED && errno != EINTR)
746 {
747 perror ("connect");
748 break;
749 }
750 else
751 usleep (5000);
752 }
753
754 if (not_connected)
755 {
756 _p_error ("unable to connect to %s", hosts(i - 1).c_str ());
757 _exit (1);
758 }
759 }
760
761 octave_parallel_connections *conns = new octave_parallel_connections
762 (network, uuid.c_str (), true);
763 octave_value sockets (conns);
764
765 __pwg.__free ();
766
767 #ifdef HAVE_OCTAVE_INTERPRETER_H
768 OCTAVE__INTERPRETER::the_interpreter () -> interactive (false);
769 #else
770 interactive = false;
771 #endif
772
773 // install 'sockets' as Octave variable
774 OCTAVE__INTERPRETER__SYMBOL_TABLE__ASSIGN ("sockets", sockets);
775 dsprintf ("'sockets' installed\n");
776
777 int cd_ok = OCTAVE__SYS__ENV::chdir (directory.c_str ());
778 if (! cd_ok)
779 {
780 OCTAVE__SYS__ENV::chdir ("/tmp");
781 dsprintf ("performed chdir to /tmp\n");
782 }
783 else
784 dsprintf ("performed chdir to %s\n", directory.c_str ());
785
786 dsprintf ("calling function reval_loop\n");
787 reval_loop (cmd_str); // does not return
788
789 return retval;
790 }
791