1 /*
2 
3 Copyright (C) 2015-2019 Olaf Till <i7tiol@t-online.de>
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 3 of the License, or
8 (at your option) any later version.
9 
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License
16 along with this program; If not, see <http://www.gnu.org/licenses/>.
17 
18 */
19 
20 #ifndef __OCT_PARALLEL_STREAMS__
21 
22 #define __OCT_PARALLEL_STREAMS__
23 
24 #ifdef HAVE_LIBGNUTLS
25 #include <gnutls/gnutls.h>
26 #ifdef HAVE_LIBGNUTLS_EXTRA
27 #include <gnutls/extra.h>
28 #endif
29 #endif
30 
31 #include <string.h>
32 
33 #include <signal.h>
34 
35 #include "parallel-gnutls.h"
36 
37 enum encryption_type
38   {
39     ENCR_RAW,           // bare sockets
40     ENCR_TLS_SRP        // gnutls TLS SRP
41   };
42 
43 #define PROTOCOL_VERSION 1
44 
45 extern volatile sig_atomic_t octave_parallel_interrupted;
46 
47 bool octave_parallel_assert_maybe_inthandler (void);
48 
49 void octave_parallel_inthandler (int signal);
50 
51 class
52 octave_parallel_maybe_inthandler
53 {
54   // This class is meant to have only one object instantiated.
55 
56 public:
57 
octave_parallel_maybe_inthandler(void)58   octave_parallel_maybe_inthandler (void)
59     : store_errno (0), oldsigset (NULL)
60   {
61     oldsigset = gnulib_alloc_sigset ();;
62 
63     if (gnulib_get_sigmask (oldsigset))
64       {
65         store_errno = errno;
66         return;
67       }
68 
69     if (gnulib_sigismember (oldsigset, SIGINT))
70       {
71         // SIGINT is blocked. (We probably have a (newer) version of
72         // Octave in which there is a separate signal handling
73         // thread.) Install a signal handler; it can remain installed,
74         // it will only have an effect during the periods when we
75         // temporarily unblock SIGINT to make some system calls
76         // interruptable.
77 
78         if (gnulib_install_sighandler (SIGINT, &octave_parallel_inthandler))
79           {
80             store_errno = errno;
81 
82             return;
83           }
84       }
85   }
86 
~octave_parallel_maybe_inthandler(void)87   ~octave_parallel_maybe_inthandler (void)
88   {
89     free (oldsigset);
90   }
91 
good(void)92   bool good (void)
93   {
94     return store_errno == 0;
95   }
96 
97 private:
98 
99   int store_errno;
100 
101   void *oldsigset;
102 };
103 
104 // temporarily remove SIGINT from set of blocked signals; there are
105 // still racing conditions which may necessitate hitting ctrl-c more
106 // than once
107 class
108 octave_parallel_open_sigmask
109 {
110 public:
111 
octave_parallel_open_sigmask(void)112   octave_parallel_open_sigmask (void)
113     : restore_sigmask (false), oldsigset (NULL)
114   {
115     oldsigset = gnulib_alloc_sigset ();
116 
117     if (gnulib_get_sigmask (oldsigset)
118         || gnulib_unblock (initialize_newset ()))
119       return;
120 
121     restore_sigmask = true;
122   }
123 
~octave_parallel_open_sigmask(void)124   ~octave_parallel_open_sigmask (void)
125   {
126     if (restore_sigmask)
127       gnulib_set_sigmask (oldsigset);
128 
129     free (oldsigset);
130   }
131 
132 private:
133 
134   bool restore_sigmask;
135 
136   void *oldsigset;
137 
138   class
139   sigset_guard
140   {
141   public:
142 
sigset_guard(void)143     sigset_guard (void) : newsigset (NULL)
144     {
145       newsigset = gnulib_alloc_sigset ();
146     }
147 
get(void)148     void *get (void)
149     {
150       return newsigset;
151     }
152 
~sigset_guard(void)153     ~sigset_guard (void)
154     {
155       free (newsigset);
156     }
157 
158   private:
159 
160     void *newsigset;
161   };
162 
initialize_newset(void)163   static void *initialize_newset (void)
164   {
165     static bool first_call = true;
166     static sigset_guard __sigset_guard;
167 
168     if (first_call)
169       {
170         gnulib_set_newset_one_signal (__sigset_guard.get (), SIGINT);
171 
172         first_call = false;
173       }
174 
175     return __sigset_guard.get ();
176   }
177 };
178 
179 /*
180 Although we use the standard file format, we can't rely on
181 gnutls_srp_set_server_credentials_file(), since current gnutls uses
182 standard C library streams to read the files (external user space
183 buffers) and own automatic variables which are not zero'ed to hold the
184 sensitive data.
185  */
186 #ifdef HAVE_LIBGNUTLS
187 extern "C" int octave_parallel_server_credentials_callback
188 (gnutls_session_t session, const char *username, gnutls_datum_t *salt,
189  gnutls_datum_t *verifier, gnutls_datum_t *g, gnutls_datum_t *n);
190 #endif
191 
192 
193 // It is better to have classes for handling errors in input and
194 // output. For some tasks we can't use default standard C++
195 // io-classes: file-io with sensitive data should not be buffered,
196 // sockets have a system file-descriptor in the first place (and on
197 // some systems 'read' and 'write' may not work for them), and gnutls
198 // connections have special functions for sending and receiving. On
199 // the other hand, it is necessary to have a std::ostream and a
200 // std::istream for the connection, since Octaves functions for saving
201 // and loading data require them.
202 //
203 // To address these related problems, a common parent class, derived
204 // from std::streambuf, and several derived classes are declared
205 // here. Also, a class is provided which takes one of these
206 // streambuf objects and also contains a std::ostream and a
207 // std::istream which reference the streambuf object.
208 
209 // parent class
210 class
211 octave_parallel_streambuf : public std::streambuf
212 {
213 public:
214 
octave_parallel_streambuf(void)215   octave_parallel_streambuf (void)
216     : readable (false), writeable (false), fid (-1), store_errno (0),
217       terminated (false)
218   {
219     // no character in the one-character-wide buffer at the beginning
220     setg (&inbuf, (&inbuf) + 1, (&inbuf) + 1);
221 
222     if (! octave_parallel_assert_maybe_inthandler ())
223       store_errno = -1;
224   }
225 
~octave_parallel_streambuf(void)226   virtual ~octave_parallel_streambuf (void)
227   {
228     inbuf = '\0';
229   }
230 
good(void)231   bool good (void)
232   {
233     return (store_errno == 0 && ! terminated);
234   }
235 
check_interrupt_handler(void)236   bool check_interrupt_handler (void)
237   {
238     if (octave_parallel_interrupted)
239       {
240         terminated = true;
241 
242         return true;
243       }
244     else
245       return false;
246   }
247 
io_error(void)248   bool io_error (void)
249   {
250     return (store_errno);
251   }
252 
close(void)253   void close (void)
254   {
255     if (fid >= 0)
256       {
257         int ret;
258 
259         // don't repeat interrupted close()
260         ret = do_close ();
261 
262         if (ret == -1)
263           store_errno = errno;
264 
265         fid = -1;
266       }
267   }
268 
write(const void * buf,size_t count)269   ssize_t write (const void *buf, size_t count)
270   {
271     if (! good ())
272       return -1;
273 
274     if (fid < 0)
275       return -1;
276 
277     if (count == 0)
278       return 0;
279 
280     ssize_t ret;
281 
282     size_t left = count;
283 
284     {
285       octave_parallel_interrupted = 0;
286 
287       octave_parallel_open_sigmask dummy;
288 
289       call_octave_quit ();
290 
291       do
292         {
293           if ((ret = do_write ((char *) buf + count - left, left)) > 0)
294             left -= ret;
295 
296           call_octave_quit ();
297         }
298       while (left && ret > 0 && ! check_interrupt_handler ());
299 
300     }
301 
302     if (ret == -1)
303       store_errno = errno;
304     else if (left)
305       store_errno = -1;
306 
307     if (store_errno)
308       return -1;
309     else
310       return count;
311   }
312 
gnutls_write_alert_for_flushing(void)313   virtual int gnutls_write_alert_for_flushing (void)
314   {
315     if (! good () || fid < 0)
316       return -1;
317     else
318       return 0;
319   }
320 
read(void * buf,size_t count)321   ssize_t read (void *buf, size_t count)
322   {
323     if (! good ())
324       return -1;
325 
326     if (fid < 0)
327       return -1;
328 
329     if (count == 0)
330       return 0;
331 
332     int ndiff = 0;
333     if (gptr () != egptr ())
334       {
335         *((char *) buf) = *(gptr ());
336         gbump (1);
337         buf = (void *)(((char *) buf) + 1);
338         count--;
339         ndiff = 1;
340       }
341 
342     int ret = read_directly (buf, count);
343     if (ret > -1)
344       ret += ndiff;
345     return ret;
346   }
347 
348   bool readable;
349 
350   bool writeable;
351 
get_fid(void)352   int get_fid (void)
353   {
354     return fid;
355   }
356 
357   // redefined streambuf virtual functions
358 
xsputn(const char * buf,std::streamsize count)359   std::streamsize xsputn (const char *buf, std::streamsize count)
360   {
361     ssize_t ret = write ((const void *) buf, count);
362 
363     return ret >= 0 ? ret : 0;
364   }
365 
366   int overflow (int c = traits_type::eof ())
367   {
368     if (c != traits_type::eof ())
369       {
370         traits_type::char_type tc = traits_type::to_char_type (c);
371 
372         if (write ((void *) &tc, sizeof (traits_type::char_type)) == -1)
373           return traits_type::eof ();
374       }
375 
376     return traits_type::to_int_type (c);
377   }
378 
xsgetn(char * buf,std::streamsize count)379   std::streamsize xsgetn (char *buf, std::streamsize count)
380   {
381     ssize_t ret = read ((void *) buf, count);
382 
383     return ret >= 0 ? ret : 0;
384   }
385 
386   // the only member capable of storing something in the input buffer;
387   // and the only reading member which never increases gptr()
underflow(void)388   int underflow (void)
389   {
390     if (gptr () != egptr ())
391       return traits_type::to_int_type (*(gptr ()));
392     else
393       {
394         if (read_directly ((void *) eback (), 1) == -1)
395           return traits_type::to_int_type (traits_type::eof ());
396         else
397           {
398             gbump (-1);
399             return traits_type::to_int_type (*(gptr ()));
400           }
401       }
402   }
403 
uflow(void)404   int uflow (void)
405   {
406     if (gptr () != egptr ())
407       {
408         gbump (1);
409         return traits_type::to_int_type (*(gptr () - 1));
410       }
411     else
412       {
413         traits_type::char_type ret;
414 
415         if (read_directly ((void *) &ret, sizeof (traits_type::char_type))
416             == -1)
417           return traits_type::to_int_type (traits_type::eof ());
418         else
419           return traits_type::to_int_type (ret);
420       }
421   }
422 
network_send_4byteint(uint32_t v,bool)423   int network_send_4byteint (uint32_t v, bool)
424   {
425     uint32_t t = gnulib_htonl (v);
426 
427     write ((void *) &t, 4);
428 
429     if (good ())
430       return 0;
431     else
432       return -1;
433   }
434 
network_recv_4byteint(uint32_t & v)435   int network_recv_4byteint (uint32_t &v)
436   {
437     uint32_t t;
438 
439     read ((void *) &t, 4);
440 
441     if (! good ())
442       return -1;
443 
444     v = gnulib_ntohl (t);
445 
446     return 0;
447   }
448 
network_send_4byteint(int32_t v)449   int network_send_4byteint (int32_t v)
450   {
451     if (signed_int_rep () && v < 0)
452       {
453         once_gripe_neg_int_rep ();
454 
455         return -1;
456       }
457 
458     union
459     {
460       int32_t s;
461       uint32_t u;
462     } cast;
463 
464     cast.s = v;
465 
466     return network_send_4byteint (cast.u, true);
467   }
468 
network_recv_4byteint(int32_t & v)469   int network_recv_4byteint (int32_t &v)
470   {
471     uint32_t t;
472 
473     union
474     {
475       int32_t s;
476       uint32_t u;
477     } cast;
478 
479 
480     read ((void *) &t, 4);
481 
482     if (! good ())
483       return -1;
484 
485     cast.u = gnulib_ntohl (t);
486 
487     v = cast.s;
488 
489     if (signed_int_rep () && v < 0)
490       {
491         once_gripe_neg_int_rep ();
492 
493         return -1;
494       }
495 
496     return 0;
497   }
498 
network_send_string(const char * s)499   int network_send_string (const char *s)
500   {
501     uint32_t l = strlen (s);
502 
503     uint32_t t = gnulib_htonl (l);
504 
505     write ((void *) &t, 4);
506 
507     write ((const void *) s, l);
508 
509     if (good ())
510       return 0;
511     else
512       return -1;
513   }
514 
network_recv_string(char * s,uint32_t maxl)515   int network_recv_string (char *s, uint32_t maxl)
516   {
517     uint32_t l, t;
518 
519     read ((void *) &t, 4);
520 
521     if (! good ())
522       return -1;
523 
524     l = gnulib_ntohl (t);
525 
526     if (l > maxl - 1)
527       {
528         _p_error ("string to be received would be longer than buffer");
529 
530         return -1;
531       }
532 
533     s[l] = '\0';
534 
535     read ((void *) s, l);
536 
537     if (good ())
538       return 0;
539     else
540       return -1;
541   }
542 
network_recv_string(std::string & str)543   int network_recv_string (std::string &str)
544   {
545     uint32_t l, t;
546 
547     read ((void *) &t, 4);
548 
549     if (! good ())
550       return -1;
551 
552     l = gnulib_ntohl (t);
553 
554     struct __bufguard
555     {
556       __bufguard (octave_idx_type __l) : __buf (new char[__l]) {}
557       ~__bufguard (void) { delete [] __buf; }
558       char* __get (void) { return __buf; }
559       char *__buf;
560     } __bufg (l);
561 
562     read (__bufg.__get (), l);
563 
564     std::string tstr (__bufg.__get (), l);
565 
566     str = tstr;
567 
568     if (good ())
569       return 0;
570     else
571       return -1;
572   }
573 
signed_int_rep(void)574   static int signed_int_rep (void)
575   {
576     // I know this is paranoid, and that current Octave itself just
577     // relies on twos complement in saving/loading signed integers.
578 
579     static int ret = -1;
580 
581     if (ret == -1)
582       {
583         union
584         {
585           uint32_t u;
586           int32_t s;
587         } t;
588 
589         t.s = -1;
590 
591         if (t.u != 0xFFFFFFFF)
592           ret = 1;
593         else
594           ret = 0;
595       }
596 
597     return ret;
598   }
599 
600 protected:
601 
602   int fid;
603 
604   int store_errno;
605 
call_octave_quit(void)606   void call_octave_quit (void)
607   {
608     terminated = true;
609 
610     OCTAVE_QUIT;
611 
612     terminated = false;
613   }
614 
615   virtual ssize_t do_write (const void *buf, size_t count) = 0;
616 
617   virtual ssize_t do_read (void *buf, size_t count) = 0;
618 
619   virtual int do_close (void) = 0;
620 
621 private:
622 
read_directly(void * buf,size_t count)623   ssize_t read_directly (void *buf, size_t count)
624   {
625     if (! good ())
626       return -1;
627 
628     if (fid < 0)
629       return -1;
630 
631     if (count == 0)
632       return 0; // if nothing needs to be written, there's no need to
633                 // risk read() still being interrupted by a signal
634 
635     ssize_t ret;
636 
637     size_t left = count;
638 
639     {
640       octave_parallel_interrupted = 0;
641 
642       octave_parallel_open_sigmask dummy;
643 
644       call_octave_quit ();
645 
646       do
647         {
648           if ((ret = do_read ((char *) buf + count - left, left)) > 0)
649             left -= ret;
650 
651           call_octave_quit ();
652         }
653       while (left && ret > 0 && ! check_interrupt_handler ());
654 
655     }
656 
657     if (ret == -1)
658       store_errno = errno;
659     else if (left)
660       store_errno = -1;
661 
662     if (store_errno)
663       return -1;
664     else
665       return count;
666   }
667 
668   // We need to provide an input buffer of at least one character to
669   // be able to implement underflow(), needed by sgetc(), possibly
670   // needed by istream.getline(). This buffer must be zero'ed at
671   // destruction.
672   char inbuf;
673 
once_gripe_neg_int_rep(void)674   void once_gripe_neg_int_rep (void)
675   {
676     static bool first_time = true;
677 
678     if (first_time)
679       {
680         _p_error ("This machine doesn't seem to use twos complement as negative integer representation. If you want this to be supported, please file a bug report.");
681 
682         first_time = false;
683       }
684   }
685 
686   bool terminated;
687 }; // class octave_parallel_streambuf
688 
689 class
690 octave_parallel_pipe_streambuf : public octave_parallel_streambuf
691 {
692 public:
693 
octave_parallel_pipe_streambuf(octave_parallel::pipes::pipe_wrapper & pw_arg,bool in)694   octave_parallel_pipe_streambuf (octave_parallel::pipes::pipe_wrapper &pw_arg,
695                                   bool in)
696     : pw (pw_arg)
697   {
698     if (in)
699       readable = true;
700     else
701       writeable = true;
702 
703     // this is a dummy
704     fid = 1;
705   }
706 
~octave_parallel_pipe_streambuf(void)707   ~octave_parallel_pipe_streambuf (void)
708   {
709 
710   }
711 
712 protected:
713 
do_read(void * buf,size_t count)714   ssize_t do_read (void *buf, size_t count)
715   {
716     return pw.read (buf, count);
717   }
718 
do_write(const void * buf,size_t count)719   ssize_t do_write (const void *buf, size_t count)
720   {
721     return pw.write (buf, count);
722   }
723 
do_close(void)724   int do_close (void)
725   {
726     return pw.close ();
727   }
728 
729 private:
730 
731   octave_parallel::pipes::pipe_wrapper &pw;
732 };
733 
734 // To avoid user space buffers, provided by external libraries, which
735 // can not be zero'ed after use, we have to directly use system calls
736 // for reading and writing sensitive data from/to files. Luckily, for
737 // this small amount of file-io we don't need to construct a custom
738 // user space buffer.
739 class
740 octave_parallel_file_streambuf : public octave_parallel_streambuf
741 {
742 public:
743 
744   octave_parallel_file_streambuf (const char *path, int flags, mode_t mode = 0)
745   {
746     while ((fid = open (path, flags, mode)) == -1 && errno == EINTR);
747 
748     if (fid == -1)
749       store_errno = errno;
750     else
751       {
752         // care is needed here, one of the flag constants can
753         // represent the value zero (O_RDONLY on my system), so that
754         // checking for (flags & flag_constant) does not work
755 
756         if ((O_RDONLY == 0 && ! (flags & O_WRONLY)) ||
757             (O_RDONLY && flags & (O_RDONLY | O_RDWR)))
758           readable = true;
759 
760         if ((O_WRONLY == 0 && ! (flags & O_RDONLY)) ||
761             (O_WRONLY && flags & (O_WRONLY | O_RDWR)))
762           writeable = true;
763       }
764   }
765 
~octave_parallel_file_streambuf(void)766   ~octave_parallel_file_streambuf (void)
767   {
768     if (fid >= 0)
769       while (::close (fid) == -1 && errno == EINTR);
770   }
771 
772 protected:
773 
do_read(void * buf,size_t count)774   ssize_t do_read (void *buf, size_t count)
775   {
776     return ::read (fid, buf, count);
777   }
778 
do_write(const void * buf,size_t count)779   ssize_t do_write (const void *buf, size_t count)
780   {
781     return ::write (fid, buf, count);
782   }
783 
do_close(void)784   int do_close (void)
785   {
786     return ::close (fid);
787   }
788 }; // class octave_parallel_file_streambuf
789 
790 // TCP sockets without authentication or encryption, as an alternative
791 // to encrypted connections (currently only TLS-SRP), which are (is)
792 // derived from this.
793 class
794 octave_parallel_socket_streambuf : public octave_parallel_streambuf
795 {
796 public:
797 
798   // this needs c++11
octave_parallel_socket_streambuf(int sid,bool server)799   octave_parallel_socket_streambuf (int sid, bool server)
800     : octave_parallel_socket_streambuf (sid, server, ENCR_RAW)
801   {
802 
803   }
804 
805   // This is extra, so that (derived) encrypted connection streambufs
806   // can give it their own 'encrtype' argument to communicate and
807   // check encryption type before encryption starts.
808   octave_parallel_socket_streambuf (int sid, bool server,
809                                     encryption_type encrtype);
810 
~octave_parallel_socket_streambuf(void)811   ~octave_parallel_socket_streambuf (void)
812   {
813     if (fid >= 0)
814       while (gnulib_close (fid) == -1 && errno == EINTR);
815   }
816 
817   // redefined streambuf virtual functions
818 
819   std::streamsize showmanyc (void);
820 
821 protected:
822 
do_read(void * buf,size_t count)823   ssize_t do_read (void *buf, size_t count)
824   {
825     return gnulib_recv (fid, buf, count, 0);
826   }
827 
do_write(const void * buf,size_t count)828   ssize_t do_write (const void *buf, size_t count)
829   {
830     return gnulib_send (fid, buf, count, 0);
831   }
832 
do_close(void)833   int do_close (void)
834   {
835     return gnulib_close (fid);
836   }
837 };
838 
839 // for usage with class octave_parallel_gnutls_streambuf, one parent
840 // class and one derived class for server and client, respectively;
841 // objects meant to be allocated; the class storing the pointer should
842 // check the refcount and care for deallocation
843 #ifdef HAVE_LIBGNUTLS
844 class
845 octave_parallel_gnutls_srp_credentials
846 {
847 public:
848 
octave_parallel_gnutls_srp_credentials(void)849   octave_parallel_gnutls_srp_credentials (void)
850   : cred (NULL), refcount (0)
851   {
852 
853   }
854 
~octave_parallel_gnutls_srp_credentials(void)855   virtual ~octave_parallel_gnutls_srp_credentials (void)
856   {
857 
858   }
859 
get_ref(void)860   void *get_ref (void)
861   {
862     refcount++;
863     dlprintf ("cred incr its refcount to %i\n", refcount);
864 
865     return cred;
866   }
867 
release_ref(void)868   int release_ref (void)
869   {
870     dlprintf ("cred will decr its refcount to %i\n", refcount - 1);
871     return --refcount;
872   }
873 
check_ref(void)874   int check_ref (void)
875   {
876     dlprintf ("cred refcount was checked, is %i\n", refcount);
877     return refcount;
878   }
879 
check_cred(void)880   void *check_cred (void)
881   {
882     return cred;
883   }
884 
885 protected:
886 
887   void *cred;
888 
889   virtual void free_credentials (void) = 0;
890 
891 private:
892 
893   int refcount;
894 };
895 
896 // see comment of parent class
897 class
898 octave_parallel_gnutls_srp_client_credentials :
899   public octave_parallel_gnutls_srp_credentials
900 {
901 public:
902 
903   octave_parallel_gnutls_srp_client_credentials (const char *user,
904                                                  char *apasswd = NULL);
905 
906   octave_parallel_gnutls_srp_client_credentials (std::string passwd_file);
907 
~octave_parallel_gnutls_srp_client_credentials(void)908   ~octave_parallel_gnutls_srp_client_credentials (void)
909   {
910     dlprintf ("ccred destr called\n");
911     if (password)
912       {
913         memset ((void *) password, '\0', strlen (password));
914 
915         if (pw_allocated)
916           delete [] password;
917       }
918 
919     if (cred)
920       {
921         dlprintf ("ccred destr will call 'free_credentials()'\n");
922         free_credentials ();
923       }
924   }
925 
get_passwd(void)926   const char *get_passwd (void)
927   {
928     return password;
929   }
930 
931 protected:
932 
free_credentials(void)933   void free_credentials (void)
934   {
935     gnutls_srp_free_client_credentials
936       ((gnutls_srp_client_credentials_t) cred);
937     cred = NULL;
938   }
939 
940 private:
941 
942   char *password;
943 
944   bool pw_allocated;
945 };
946 
947 // see comment of parent class
948 class
949 octave_parallel_gnutls_srp_server_credentials :
950   public octave_parallel_gnutls_srp_credentials
951 {
952 public:
953 
octave_parallel_gnutls_srp_server_credentials(std::string & passwd_file)954   octave_parallel_gnutls_srp_server_credentials
955   (std::string &passwd_file)
956   {
957     if (! gnutls_srp_allocate_server_credentials
958         ((gnutls_srp_server_credentials_t *) &cred))
959       {
960         // initialize filename in callback function
961         octave_parallel_server_credentials_callback (NULL, passwd_file.c_str (),
962                                                      NULL, NULL, NULL, NULL);
963 
964         gnutls_srp_set_server_credentials_function
965           ((gnutls_srp_server_credentials_t) cred,
966            octave_parallel_server_credentials_callback);
967       }
968   }
969 
~octave_parallel_gnutls_srp_server_credentials(void)970   ~octave_parallel_gnutls_srp_server_credentials (void)
971   {
972     if (cred)
973       free_credentials ();
974   }
975 
976 protected:
977 
free_credentials(void)978   void free_credentials (void)
979   {
980     gnutls_srp_free_server_credentials
981       ((gnutls_srp_server_credentials_t) cred);
982     cred = NULL;
983   }
984 };
985 
986 // TCP sockets with a gnutls connection.
987 class
988 octave_parallel_gnutls_streambuf : public octave_parallel_socket_streambuf
989 {
990 
991 public:
992 
993   octave_parallel_gnutls_streambuf
994   (int sid, octave_parallel_gnutls_srp_credentials *accred, bool server);
995 
~octave_parallel_gnutls_streambuf(void)996   ~octave_parallel_gnutls_streambuf (void)
997   {
998     if (session)
999       gnutls_deinit (session);
1000 
1001     if (ccred)
1002       dlprintf ("gstreambuf destructor will requcest decr of cred refcount\n");
1003     int tpdebug;
1004     if (ccred && (tpdebug = ccred->release_ref ()) <= 0)
1005       {
1006         dlprintf ("gstreambuf destructor got decr cred refcount of %i and will delete cred\n", tpdebug);
1007         delete ccred;
1008       }
1009   }
1010 
server_get_username(void)1011   const char *server_get_username (void)
1012   {
1013     return gnutls_srp_server_get_username (session);
1014   }
1015 
gnutls_write_alert_for_flushing(void)1016   int gnutls_write_alert_for_flushing (void)
1017   {
1018     if (! good () || fid < 0)
1019       return -1;
1020 
1021     int ret;
1022     // this is a dummy alert to force flushing
1023     {
1024       octave_parallel_interrupted = 0;
1025 
1026       octave_parallel_open_sigmask dummy;
1027 
1028       call_octave_quit ();
1029 
1030       do
1031         {
1032           ret = gnutls_alert_send
1033             (session, GNUTLS_AL_WARNING, GNUTLS_A_INTERNAL_ERROR);
1034 
1035           call_octave_quit ();
1036         }
1037       while (ret == GNUTLS_E_AGAIN && ! check_interrupt_handler ());
1038 
1039     }
1040 
1041     if (ret != GNUTLS_E_SUCCESS)
1042       {
1043         store_errno = -1;
1044         return -1;
1045       }
1046     else
1047       return 0;
1048   }
1049 
1050 protected:
1051 
do_read(void * buf,size_t count)1052   ssize_t do_read (void *buf, size_t count)
1053   {
1054     ssize_t ret;
1055 
1056     while ((ret = gnutls_record_recv (session, buf, count)) ==
1057            GNUTLS_E_WARNING_ALERT_RECEIVED)
1058       gnutls_alert_get (session); // discard the alert
1059 
1060     if (ret < 0)
1061       {
1062         if (ret == GNUTLS_E_INTERRUPTED)
1063           errno = EINTR;
1064         else if (ret == GNUTLS_E_AGAIN)
1065           errno = EAGAIN;
1066 
1067         ret = -1;
1068       }
1069 
1070     return ret;
1071   }
1072 
do_write(const void * buf,size_t count)1073   ssize_t do_write (const void *buf, size_t count)
1074   {
1075     ssize_t ret;
1076 
1077     while ((ret = gnutls_record_send (session, buf, count)) ==
1078            GNUTLS_E_WARNING_ALERT_RECEIVED)
1079       gnutls_alert_get (session); // discard the alert
1080 
1081     if (ret < 0)
1082       {
1083         if (ret == GNUTLS_E_INTERRUPTED)
1084           errno = EINTR;
1085         else if (ret == GNUTLS_E_AGAIN)
1086           errno = EAGAIN;
1087 
1088         ret = -1;
1089       }
1090 
1091     return ret;
1092   }
1093 
do_close(void)1094   int do_close (void)
1095   {
1096     if (session)
1097       {
1098         gnutls_deinit (session);
1099 
1100         session = NULL;
1101       }
1102 
1103     if (ccred && ccred->release_ref () <= 0)
1104       delete ccred;
1105 
1106     ccred = NULL;
1107 
1108     return octave_parallel_socket_streambuf::do_close ();
1109   }
1110 
1111   octave_parallel_gnutls_srp_credentials *ccred;
1112 
1113   void *cred;
1114 
1115   gnutls_session_t session;
1116 };
1117 
1118 #endif // HAVE_LIBGNUTLS
1119 
1120 class
1121 octave_parallel_stream
1122 {
1123 public:
1124 
octave_parallel_stream(octave_parallel_streambuf * strbuf_arg)1125   octave_parallel_stream (octave_parallel_streambuf *strbuf_arg)
1126   : strbuf (strbuf_arg), istr (NULL), ostr (NULL)
1127   {
1128     if (strbuf->readable)
1129       istr = new std::istream (strbuf);
1130 
1131     if (strbuf->writeable)
1132       ostr = new std::ostream (strbuf);
1133   }
1134 
~octave_parallel_stream(void)1135   ~octave_parallel_stream (void)
1136   {
1137     if (istr)
1138       delete istr;
1139 
1140     if (ostr)
1141       delete ostr;
1142 
1143     dlprintf ("pstream destructor will delete streambuf\n");
1144     delete strbuf;
1145   }
1146 
good(void)1147   bool good (void) const
1148   {
1149     return strbuf->good ();
1150   }
1151 
io_error(void)1152   bool io_error (void) const
1153   {
1154     return strbuf->io_error ();
1155   }
1156 
close(void)1157   void close (void) const
1158   {
1159     if (strbuf)
1160       strbuf->close ();
1161   }
1162 
get_istream(void)1163   std::istream &get_istream (void) const
1164   {
1165     return *istr;
1166   }
1167 
get_ostream(void)1168   std::ostream &get_ostream (void) const
1169   {
1170     return *ostr;
1171   }
1172 
network_send_4byteint(uint32_t v,bool)1173   int network_send_4byteint (uint32_t v, bool) const
1174   {
1175     return strbuf->network_send_4byteint (v, true);
1176   }
1177 
network_recv_4byteint(uint32_t & v)1178   int network_recv_4byteint (uint32_t &v) const
1179   {
1180     return strbuf->network_recv_4byteint (v);
1181   }
1182 
network_send_4byteint(int32_t v)1183   int network_send_4byteint (int32_t v) const
1184   {
1185     return strbuf->network_send_4byteint (v);
1186   }
1187 
network_recv_4byteint(int32_t & v)1188   int network_recv_4byteint (int32_t &v) const
1189   {
1190     return strbuf->network_recv_4byteint (v);
1191   }
1192 
network_send_string(const char * s)1193   int network_send_string (const char *s) const
1194   {
1195     return strbuf->network_send_string (s);
1196   }
1197 
network_recv_string(char * s,int maxl)1198   int network_recv_string (char *s, int maxl) const
1199   {
1200     return strbuf->network_recv_string (s, maxl);
1201   }
1202 
network_recv_string(std::string & str)1203   int network_recv_string (std::string &str) const
1204   {
1205     return strbuf->network_recv_string (str);
1206   }
1207 
get_strbuf(void)1208   octave_parallel_streambuf *get_strbuf (void)
1209   {
1210     return strbuf;
1211   }
1212 
signed_int_rep(void)1213   static int signed_int_rep (void)
1214   {
1215     return octave_parallel_streambuf::signed_int_rep ();
1216   }
1217 
1218 private:
1219 
1220   octave_parallel_streambuf *strbuf;
1221 
1222   std::istream *istr;
1223 
1224   std::ostream *ostr;
1225 };
1226 
1227 #endif // __OCT_PARALLEL_STREAMS__
1228 
1229 
1230 /*
1231 ;;; Local Variables: ***
1232 ;;; mode: C++ ***
1233 ;;; End: ***
1234 */
1235