1 /*
2 
3 Copyright (C) 2013-2019 Olaf Till <i7tiol@t-online.de>
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 3 of the License, or
8 (at your option) any later version.
9 
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License
16 along with this program; If not, see <http://www.gnu.org/licenses/>.
17 
18 */
19 
20 #include <octave/oct.h>
21 #include <octave/ov-struct.h>
22 #include <octave/Cell.h>
23 
24 #include <stdio.h>
25 
26 #include "command.h"
27 #include "error-helpers.h"
28 #include <libpq/libpq-fs.h>
29 
30 // PKG_ADD: autoload ("pq_lo_import", "pq_interface.oct");
31 // PKG_ADD: autoload ("pq_lo_export", "pq_interface.oct");
32 // PKG_ADD: autoload ("pq_lo_unlink", "pq_interface.oct");
33 // PKG_DEL: autoload ("pq_lo_import", "pq_interface.oct", "remove");
34 // PKG_DEL: autoload ("pq_lo_export", "pq_interface.oct", "remove");
35 // PKG_DEL: autoload ("pq_lo_unlink", "pq_interface.oct", "remove");
36 
37 #define OCT_PQ_BUFSIZE 1024
38 
39 // For cleanup handling this is a class.
40 class pipe_to_lo
41 {
42 public:
43 
44   pipe_to_lo (octave_pq_connection_rep &, const char *, bool, std::string &);
45 
46   ~pipe_to_lo (void);
47 
valid(void)48   bool valid (void) { return oid_valid; }
49 
get_oid(void)50   Oid get_oid (void) { return oid; }
51 
52   std::string &msg;
53 
54 private:
55 
56   octave_pq_connection_rep &oct_pq_conn;
57 
58   PGconn *conn;
59 
60   Oid oid;
61 
62   FILE *fp;
63 
64   bool oid_valid;
65 
66   int lod;
67 
68   bool commit;
69 };
70 
pipe_to_lo(octave_pq_connection_rep & a_oct_pq_conn,const char * cmd,bool acommit,std::string & amsg)71 pipe_to_lo::pipe_to_lo (octave_pq_connection_rep &a_oct_pq_conn,
72                         const char *cmd, bool acommit, std::string &amsg)
73   : msg (amsg), oct_pq_conn (a_oct_pq_conn),
74     conn (a_oct_pq_conn.octave_pq_get_conn ()), oid (0), fp (NULL),
75     oid_valid (false), lod (-1), commit (acommit)
76 {
77   BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
78   oid = lo_creat (conn, INV_WRITE);
79   END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
80   if (! oid || oid == InvalidOid)
81     {
82       msg = PQerrorMessage (conn);
83 
84       oid = 0;
85 
86       return;
87     }
88 
89   if (! (fp = popen (cmd, "r")))
90     {
91       msg = "could not create pipe";
92 
93       return;
94     }
95 
96   BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
97   lod = lo_open (conn, oid, INV_WRITE);
98   END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
99   if (lod == -1)
100     {
101       msg = PQerrorMessage (conn);
102 
103       return;
104     }
105 
106   char buff [OCT_PQ_BUFSIZE];
107 
108   int nb = 0, pnb = 0; // silence inadequate warnings by initializing
109                        // them
110 
111   while (true)
112     {
113       BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
114       nb = fread (buff, 1, OCT_PQ_BUFSIZE, fp);
115       END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
116 
117       if (! nb) break;
118 
119       BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
120       pnb = lo_write (conn, lod, buff, nb);
121       END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
122       if (pnb != nb)
123         {
124           msg = PQerrorMessage (conn);
125 
126           break;
127         }
128       }
129   if (nb) return;
130 
131   if (pclose (fp) == -1)
132     _p_error ("error closing pipe");
133 
134   fp = NULL;
135 
136   if (lo_close (conn, lod))
137     msg = PQerrorMessage (conn);
138   else
139     oid_valid = true;
140 
141   lod = -1;
142 }
143 
~pipe_to_lo(void)144 pipe_to_lo::~pipe_to_lo (void)
145 {
146   if (lod != -1)
147     {
148       if (lo_close (conn, lod))
149         _p_error ("%s", PQerrorMessage (conn));
150 
151       lod = -1;
152     }
153 
154   if (oid && ! oid_valid)
155     {
156       if (lo_unlink (conn, oid) == -1)
157         _p_error ("error unlinking new large object with oid %i", oid);
158     }
159   else
160     oid = 0;
161 
162   if (fp)
163     {
164       if (pclose (fp) == -1)
165         _p_error ("error closing pipe");
166 
167       fp = NULL;
168     }
169 
170   if (commit)
171     {
172       std::string cmd ("commit;");
173       Cell params;
174       Cell ptypes (1, 0);
175       Cell rtypes;
176       std::string caller ("pq_lo_import");
177       command c (oct_pq_conn, cmd, params, ptypes, rtypes, caller);
178 
179       if (c.good ())
180         c.process_single_result ();
181 
182       if (! c.good ())
183         _p_error ("%s: could not commit", caller.c_str ());
184     }
185 }
186 
187 // For cleanup handling this is a class.
188 class lo_to_pipe
189 {
190 public:
191 
192   lo_to_pipe (octave_pq_connection_rep &, Oid, const char *, bool, std::string &);
193 
194   ~lo_to_pipe (void);
195 
valid(void)196   bool valid (void) { return success; }
197 
198   std::string &msg;
199 
200 private:
201 
202   octave_pq_connection_rep &oct_pq_conn;
203 
204   PGconn *conn;
205 
206   Oid oid;
207 
208   FILE *fp;
209 
210   bool success;
211 
212   int lod;
213 
214   bool commit;
215 };
216 
lo_to_pipe(octave_pq_connection_rep & a_oct_pq_conn,Oid aoid,const char * cmd,bool acommit,std::string & amsg)217 lo_to_pipe::lo_to_pipe (octave_pq_connection_rep &a_oct_pq_conn, Oid aoid,
218                         const char *cmd, bool acommit, std::string &amsg) :
219   msg (amsg), oct_pq_conn (a_oct_pq_conn),
220   conn (a_oct_pq_conn.octave_pq_get_conn ()), oid (aoid), fp (NULL),
221   success (false), lod (-1), commit (acommit)
222 {
223   if (! (fp = popen (cmd, "w")))
224     {
225       msg = "could not create pipe";
226 
227       return;
228     }
229 
230   BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
231   lod = lo_open (conn, oid, INV_READ);
232   END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
233   if (lod == -1)
234     {
235       msg = PQerrorMessage (conn);
236 
237       return;
238     }
239 
240   char buff [OCT_PQ_BUFSIZE];
241 
242   int nb = 0, pnb = 0; // silence inadequate warnings by initializing
243                        // them
244 
245   while (true)
246     {
247       BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
248       pnb = lo_read (conn, lod, buff, OCT_PQ_BUFSIZE);
249       END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
250 
251       if (pnb == -1)
252         {
253           msg = PQerrorMessage (conn);
254 
255           break;
256         }
257 
258       if (! pnb) break;
259 
260       BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
261       nb = fwrite (buff, 1, pnb, fp);
262       END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
263       if (nb != pnb)
264         {
265           msg = "error writing to pipe";
266 
267           break;
268         }
269     }
270   if (pnb) return;
271 
272   if (pclose (fp) == -1)
273     _p_error ("error closing pipe");
274 
275   fp = NULL;
276 
277   if (lo_close (conn, lod))
278     msg = PQerrorMessage (conn);
279   else
280     success = true;
281 
282   lod = -1;
283 }
284 
~lo_to_pipe(void)285 lo_to_pipe::~lo_to_pipe (void)
286 {
287   if (lod != -1)
288     {
289       if (lo_close (conn, lod))
290         _p_error ("%s", PQerrorMessage (conn));
291 
292       lod = -1;
293     }
294 
295   if (fp)
296     {
297       if (pclose (fp) == -1)
298         _p_error ("error closing pipe");
299 
300       fp = NULL;
301     }
302 
303   if (commit)
304     {
305       std::string cmd ("commit;");
306       Cell params;
307       Cell ptypes (1, 0);
308       Cell rtypes;
309       std::string caller ("pq_lo_export");
310       command c (oct_pq_conn, cmd, params, ptypes, rtypes, caller);
311 
312       if (c.good ())
313         c.process_single_result ();
314 
315       if (! c.good ())
316         _p_error ("%s: could not commit", caller.c_str ());
317     }
318 }
319 
320 DEFUN_DLD (pq_lo_import, args, ,
321            "-*- texinfo -*-\n\
322 @deftypefn {Loadable Function} {@var{oid} =} pq_lo_import (@var{connection}, @var{path})\n\
323 Imports the file in @var{path} on the client side as a large object into the database associated with @var{connection} and returns the Oid of the new large object. If @var{path} ends with a @code{|}, it is take as a shell command whose output is piped into a large object.\n\
324 @end deftypefn")
325 {
326   std::string fname ("pq_lo_import");
327 
328   octave_value retval;
329 
330   if (args.length () != 2 ||
331       args(0).type_id () != octave_pq_connection::static_type_id ())
332     {
333       print_usage ();
334 
335       return retval;
336     }
337 
338   std::string path;
339   CHECK_ERROR (path = args(1).string_value (), retval,
340                "%s: second argument can not be converted to a string",
341                fname.c_str ());
342 
343   bool from_pipe = false;
344   unsigned int l = path.size ();
345   if (l && path[l - 1] == '|')
346     {
347       unsigned int pos;
348       // There seemed to be a bug in my C++ library so that
349       // path.find_last_not_of (" \t\n\r\f", l - 1))
350       // returned l - 1 ! This is the workaround.
351       path.erase (l - 1, 1);
352       if ((pos = path.find_last_not_of (" \t\n\r\f"))
353           == std::string::npos)
354         {
355           error ("%s: no command found to pipe from", fname.c_str ());
356 
357           return retval;
358         }
359       path.erase (pos + 1, std::string::npos);
360 
361       from_pipe = true;
362     }
363 
364   const octave_base_value& rep = (args(0).get_rep ());
365 
366   const octave_pq_connection &oct_pq_conn =
367     dynamic_cast<const octave_pq_connection&> (rep);
368 
369   PGconn *conn = oct_pq_conn.get_rep ()->octave_pq_get_conn ();
370 
371   if (! conn)
372     {
373       error ("%s: connection not open", fname.c_str ());
374       return retval;
375     }
376 
377   bool make_tblock = false;
378   switch (PQtransactionStatus (conn))
379     {
380     case PQTRANS_IDLE:
381       make_tblock = true;
382       break;
383     case PQTRANS_INTRANS:
384       break;
385     case PQTRANS_INERROR:
386       error ("%s: can't manipulate large objects within a failed transaction block",
387              fname.c_str ());
388       return retval;
389     case PQTRANS_UNKNOWN:
390       error ("%s: connection is bad", fname.c_str ());
391       return retval;
392     default: // includes PQTRANS_ACTIVE
393       error ("%s: unexpected connection state", fname.c_str ());
394       return retval;
395     }
396 
397   if (make_tblock)
398     {
399       std::string cmd ("begin;");
400       Cell params;
401       Cell ptypes (1, 0);
402       Cell rtypes;
403       command c (*(oct_pq_conn.get_rep ()), cmd, params, ptypes, rtypes, fname);
404 
405       if (c.good ())
406         c.process_single_result ();
407 
408       if (! c.good ())
409         {
410           error ("%s: could not begin transaction", fname.c_str ());
411           return retval;
412         }
413     }
414 
415   Oid oid = 0;
416 
417   bool import_error = false;
418   std::string msg;
419 
420   if (from_pipe)
421     {
422       pipe_to_lo tp (*(oct_pq_conn.get_rep ()), path.c_str (), make_tblock, msg);
423 
424       make_tblock = false; // commit handled by destructor of pipe_to_lo
425 
426       if (tp.valid ())
427         oid = tp.get_oid ();
428       else
429         import_error = true;
430     }
431   else
432     if (! (oid = lo_import (conn, path.c_str ())))
433       {
434         import_error = true;
435         msg = PQerrorMessage (conn);
436       }
437 
438   // if we started the transaction, commit it even in case of import failure
439   bool commit_error = false;
440   if (make_tblock)
441     {
442       std::string cmd ("commit;");
443       Cell params;
444       Cell ptypes (1, 0);
445       Cell rtypes;
446       command c (*(oct_pq_conn.get_rep ()), cmd, params, ptypes, rtypes, fname);
447 
448       if (c.good ())
449         c.process_single_result ();
450 
451       if (! c.good ())
452         commit_error = true;
453     }
454 
455   if (import_error)
456     _p_error ("%s: large object import failed: %s",
457               fname.c_str (), msg.c_str ());
458 
459   if (commit_error)
460     _p_error ("%s: could not commit transaction", fname.c_str ());
461 
462   if (import_error || commit_error)
463     {
464       error ("%s failed", fname.c_str ());
465       return retval;
466     }
467 
468   retval = octave_value (octave_uint32 (oid));
469 
470   return retval;
471 }
472 
473 
474 DEFUN_DLD (pq_lo_export, args, ,
475            "-*- texinfo -*-\n\
476 @deftypefn {Loadable Function} pq_lo_export (@var{connection}, @var{oid}, @var{path})\n\
477 Exports the large object of Oid @var{oid} in the database associated with @var{connection} to the file @var{path} on the client side. If @var{path} starts with a @code{|}, it is taken as a shell commant to pipe to.\n\
478 @end deftypefn")
479 {
480   std::string fname ("pq_lo_export");
481 
482   octave_value retval;
483 
484   if (args.length () != 3 ||
485       args(0).type_id () != octave_pq_connection::static_type_id ())
486     {
487       print_usage ();
488 
489       return retval;
490     }
491 
492   std::string path;
493   CHECK_ERROR (path = args(2).string_value (), retval,
494                "%s: third argument can not be converted to a string",
495                fname.c_str ());
496 
497   bool to_pipe = false;
498   if (! path.empty () && path[0] == '|')
499     {
500       unsigned int pos;
501       if ((pos = path.find_first_not_of (" \t\n\r\f", 1))
502           == std::string::npos)
503         {
504           error ("%s: no command found to pipe to", fname.c_str ());
505 
506           return retval;
507         }
508       path.erase (0, pos);
509 
510       to_pipe = true;
511     }
512 
513   Oid oid = 0;
514   CHECK_ERROR (oid = args(1).uint_value (), retval,
515                "%s: second argument can not be converted to an oid",
516                fname.c_str ());
517 
518   const octave_base_value& rep = (args(0).get_rep ());
519 
520   const octave_pq_connection &oct_pq_conn =
521     dynamic_cast<const octave_pq_connection&> (rep);
522 
523   PGconn *conn = oct_pq_conn.get_rep ()->octave_pq_get_conn ();
524 
525   if (! conn)
526     {
527       error ("%s: connection not open", fname.c_str ());
528       return retval;
529     }
530 
531   bool make_tblock = false;
532   switch (PQtransactionStatus (conn))
533     {
534     case PQTRANS_IDLE:
535       make_tblock = true;
536       break;
537     case PQTRANS_INTRANS:
538       break;
539     case PQTRANS_INERROR:
540       error ("%s: can't manipulate large objects within a failed transaction block",
541              fname.c_str ());
542       return retval;
543     case PQTRANS_UNKNOWN:
544       error ("%s: connection is bad", fname.c_str ());
545       return retval;
546     default: // includes PQTRANS_ACTIVE
547       error ("%s: unexpected connection state", fname.c_str ());
548       return retval;
549     }
550 
551   if (make_tblock)
552     {
553       std::string cmd ("begin;");
554       Cell params;
555       Cell ptypes (1, 0);
556       Cell rtypes;
557       command c (*(oct_pq_conn.get_rep ()), cmd, params, ptypes, rtypes, fname);
558 
559       if (c.good ())
560         c.process_single_result ();
561 
562       if (! c.good ())
563         {
564           error ("%s: could not begin transaction", fname.c_str ());
565           return retval;
566         }
567     }
568 
569   bool export_error = false;
570   std::string msg;
571 
572   if (to_pipe)
573     {
574       lo_to_pipe tp (*(oct_pq_conn.get_rep ()), oid, path.c_str (), make_tblock, msg);
575 
576       make_tblock = false; // commit handled by destructor of lo_to_pipe
577 
578       if (! tp.valid ())
579         export_error = true;
580     }
581   else
582     if (lo_export (conn, oid, path.c_str ()) == -1)
583       {
584         export_error = true;
585         msg = PQerrorMessage (conn);
586       }
587 
588   // if we started the transaction, commit it even in case of export failure
589   bool commit_error = false;
590   if (make_tblock)
591     {
592       std::string cmd ("commit;");
593       Cell params;
594       Cell ptypes (1, 0);
595       Cell rtypes;
596       command c (*(oct_pq_conn.get_rep ()), cmd, params, ptypes, rtypes, fname);
597 
598       if (c.good ())
599         c.process_single_result ();
600 
601       if (! c.good ())
602         commit_error = true;
603     }
604 
605   if (export_error)
606     _p_error ("%s: large object export failed: %s",
607               fname.c_str (), msg.c_str ());
608 
609   if (commit_error)
610     _p_error ("%s: could not commit transaction", fname.c_str ());
611 
612   if (export_error || commit_error)
613     error ("%s failed", fname.c_str ());
614 
615   return retval;
616 }
617 
618 
619 DEFUN_DLD (pq_lo_unlink, args, ,
620            "-*- texinfo -*-\n\
621 @deftypefn {Loadable Function} pq_lo_unlink (@var{connection}, @var{oid})\n\
622 Removes the large object of Oid @var{oid} from the database associated with @var{connection}.\n\
623 @end deftypefn")
624 {
625   std::string fname ("pq_lo_unlink");
626 
627   octave_value retval;
628 
629   if (args.length () != 2 ||
630       args(0).type_id () != octave_pq_connection::static_type_id ())
631     {
632       print_usage ();
633 
634       return retval;
635     }
636 
637   Oid oid = 0;
638   CHECK_ERROR (oid = args(1).uint_value (), retval,
639                "%s: second argument can not be converted to an oid",
640                fname.c_str ());
641 
642   const octave_base_value& rep = (args(0).get_rep ());
643 
644   const octave_pq_connection &oct_pq_conn =
645     dynamic_cast<const octave_pq_connection&> (rep);
646 
647   PGconn *conn = oct_pq_conn.get_rep ()->octave_pq_get_conn ();
648 
649   if (! conn)
650     {
651       error ("%s: connection not open", fname.c_str ());
652       return retval;
653     }
654 
655   bool make_tblock = false;
656   switch (PQtransactionStatus (conn))
657     {
658     case PQTRANS_IDLE:
659       make_tblock = true;
660       break;
661     case PQTRANS_INTRANS:
662       break;
663     case PQTRANS_INERROR:
664       error ("%s: can't manipulate large objects within a failed transaction block",
665              fname.c_str ());
666       return retval;
667     case PQTRANS_UNKNOWN:
668       error ("%s: connection is bad", fname.c_str ());
669       return retval;
670     default: // includes PQTRANS_ACTIVE
671       error ("%s: unexpected connection state", fname.c_str ());
672       return retval;
673     }
674 
675   if (make_tblock)
676     {
677       std::string cmd ("begin;");
678       Cell params;
679       Cell ptypes (1, 0);
680       Cell rtypes;
681       command c (*(oct_pq_conn.get_rep ()), cmd, params, ptypes, rtypes, fname);
682 
683       if (c.good ())
684         c.process_single_result ();
685 
686       if (! c.good ())
687         {
688           error ("%s: could not begin transaction", fname.c_str ());
689           return retval;
690         }
691     }
692 
693   bool unlink_error = false;
694   std::string msg;
695 
696   if (lo_unlink (conn, oid) == -1)
697     {
698       unlink_error = true;
699       msg = PQerrorMessage (conn);
700     }
701 
702   // if we started the transaction, commit it even in case of unlink failure
703   bool commit_error = false;
704   if (make_tblock)
705     {
706       std::string cmd ("commit;");
707       Cell params;
708       Cell ptypes (1, 0);
709       Cell rtypes;
710       command c (*(oct_pq_conn.get_rep ()), cmd, params, ptypes, rtypes, fname);
711 
712       if (c.good ())
713         c.process_single_result ();
714 
715       if (! c.good ())
716         commit_error = true;
717     }
718 
719   if (unlink_error)
720     _p_error ("%s: large object unlink failed: %s",
721               fname.c_str (), msg.c_str ());
722 
723   if (commit_error)
724     _p_error ("%s: could not commit transaction", fname.c_str ());
725 
726   if (unlink_error || commit_error)
727     error ("%s failed", fname.c_str ());
728 
729   return retval;
730 }
731