1 /*
2
3 Copyright (C) 2012-2019 Olaf Till <i7tiol@t-online.de>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; If not, see <http://www.gnu.org/licenses/>.
17
18 */
19
20 #include <octave/oct.h>
21 #include <octave/ov-struct.h>
22 #include <octave/Cell.h>
23 #include <octave/lo-ieee.h>
24
25 #include <iostream>
26 #include <fstream>
27
28 #include "command.h"
29 #include "converters.h"
30 #include "error-helpers.h"
31
32 #define COPY_HEADER_SIZE 19
33
34 #define COUT_RESIZE_STEP 1000 // resize result only after this number of rows
35
36 // This constructor is currently not used and probably broken.
command(octave_pq_connection_rep & connection,std::string & cmd,Cell & rtypes,std::string & who)37 command::command (octave_pq_connection_rep &connection, std::string &cmd,
38 Cell &rtypes, std::string &who)
39 : res (NULL), all_fetched (0), valid (1), conn (connection),
40 rettypes (rtypes), caller (who)
41 {
42 if (! (cptr = conn.octave_pq_get_conn ()))
43 {
44 valid = 0;
45 _p_error ("%s: connection not open", caller.c_str ());
46 }
47
48 if (! PQsendQuery (cptr, cmd.c_str ()))
49 {
50 valid = 0;
51 _p_error ("%s: could not dispatch command: %s", caller.c_str (),
52 PQerrorMessage (cptr));
53 }
54 else
55 {
56 res = PQgetResult (cptr);
57 check_first_result ();
58 }
59 }
60
command(octave_pq_connection_rep & connection,std::string & cmd,Cell & params,Cell & ptypes,Cell & rtypes,std::string & who)61 command::command (octave_pq_connection_rep &connection, std::string &cmd,
62 Cell ¶ms, Cell &ptypes, Cell &rtypes, std::string &who)
63 : res (NULL), all_fetched (1), valid (1), conn (connection),
64 rettypes (rtypes), caller (who)
65 {
66 if (! (cptr = conn.octave_pq_get_conn ()))
67 {
68 valid = 0;
69 _p_error ("%s: connection not open", caller.c_str ());
70 }
71
72 int npars = params.numel ();
73
74 char *vals [npars];
75 std::vector<oct_pq_dynvec_t> valsvec;
76 valsvec.resize (npars);
77 int pl [npars];
78 int pf [npars];
79 Oid oids [npars];
80
81 for (int i = 0; i < npars; i++)
82 {
83 pf[i] = 1; // means binary format
84
85 if (params(i).is_real_scalar () && params(i).isna ().bool_value ())
86 {
87 vals[i] = NULL;
88
89 oids[i] = 0;
90 }
91 else
92 {
93 pq_oct_type_t oct_type;
94 oct_pq_conv_t *conv;
95
96 if (ptypes(i).OV_ISEMPTY ())
97 {
98 oct_type = simple;
99
100 if (! (conv = pgtype_from_octtype (conn, params(i))))
101 {
102 valid = 0;
103 break;
104 }
105
106 // array not possible here
107 oids[i] = conv->oid;
108 }
109 else
110 {
111 bool err;
112 std::string s;
113 SET_ERR (s = ptypes(i).string_value (), err);
114 if (err)
115 {
116 valid = 0;
117 _p_error ("%s: parameter type specification no string",
118 caller.c_str ());
119 break;
120 }
121
122 if (! (conv = pgtype_from_spec (conn, s, oct_type)))
123 {
124 valid = 0;
125 break;
126 }
127
128 if (oct_type == array)
129 oids[i] = conv->aoid;
130 else
131 oids[i] = conv->oid;
132 }
133
134 switch (oct_type)
135 {
136 case simple:
137 if (conv->from_octave_bin (conn, params(i), valsvec[i]))
138 valid = 0;
139 break;
140
141 case array:
142 if (from_octave_bin_array (conn, params(i), valsvec[i], conv))
143 valid = 0;
144 break;
145
146 case composite:
147 if (from_octave_bin_composite (conn, params(i), valsvec[i], conv))
148 valid = 0;
149 break;
150
151 default:
152 // should not get here
153 valid = 0;
154 _p_error ("%s: internal error, undefined type identifier",
155 caller.c_str ());
156
157 }
158
159 if (! valid) break;
160
161 vals[i] = &(valsvec[i].front ());
162 pl[i] = valsvec[i].size ();
163 }
164 }
165
166 if (valid)
167 {
168 BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
169
170 res = PQexecParams (cptr, cmd.c_str (), npars, oids, vals, pl, pf, 1);
171
172 END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
173
174 check_first_result ();
175 }
176 }
177
get_elements_typeinfo(oct_pq_conv_t * conv,bool & err)178 octave_map command::get_elements_typeinfo (oct_pq_conv_t *conv, bool &err)
179 {
180 int nel = conv->el_oids.size ();
181
182 octave_map ret (dim_vector (1, nel));
183 Cell types_name (1, nel);
184 Cell types_array (1, nel);
185 Cell types_composite (1, nel);
186 Cell types_enum (1, nel);
187 Cell types_elements (1, nel);
188
189 for (int i = 0; i < nel; i++)
190 {
191 oct_pq_conv_t *el_conv;
192 pq_oct_type_t oct_type;
193
194 if (! (el_conv = pgtype_from_spec (conn, conv->el_oids[i],
195 conv->conv_cache[i], oct_type)))
196 {
197 err = true;
198 return ret;
199 }
200
201 types_name(i) = octave_value (el_conv->name);
202 types_array(i) = octave_value (oct_type == array);
203 types_enum(i) = octave_value (el_conv->is_enum);
204 types_composite(i) = octave_value (el_conv->is_composite);
205 if (el_conv->is_composite)
206 {
207 bool rec_err = false;
208 types_elements(i) = octave_value (get_elements_typeinfo (el_conv,
209 rec_err));
210 if (rec_err)
211 {
212 err = true;
213 return ret;
214 }
215 }
216 }
217
218 ret.assign ("name", types_name);
219 ret.assign ("is_array", types_array);
220 ret.assign ("is_composite", types_composite);
221 ret.assign ("is_enum", types_enum);
222 ret.assign ("elements", types_elements);
223
224 return ret;
225 }
226
process_single_result(const std::string & infile,const std::string & outfile,const Cell & cdata,const Cell & ctypes,bool coids,bool cin_var)227 octave_value command::process_single_result (const std::string &infile,
228 const std::string &outfile,
229 const Cell &cdata,
230 const Cell &ctypes,
231 bool coids,
232 bool cin_var)
233 {
234 octave_value retval;
235
236 // first result is already fetched
237 if (! res && (res = PQgetResult (cptr)))
238 state = PQresultStatus (res);
239
240 if (! res)
241 all_fetched = 1;
242 else
243 {
244 switch (state)
245 {
246 case PGRES_BAD_RESPONSE:
247 valid = 0;
248 _p_error ("%s: server response not understood", caller.c_str ());
249 break;
250 case PGRES_FATAL_ERROR:
251 valid = 0;
252 _p_error ("%s: fatal error: %s", caller.c_str (),
253 PQresultErrorMessage (res));
254 break;
255 case PGRES_COMMAND_OK:
256 retval = command_ok_handler ();
257 break;
258 case PGRES_TUPLES_OK:
259 retval = tuples_ok_handler ();
260 break;
261 case PGRES_COPY_OUT:
262 retval = copy_out_handler (outfile);
263 break;
264 case PGRES_COPY_IN:
265 retval = copy_in_handler (infile, cdata, ctypes, coids, cin_var);
266 break;
267 case PGRES_NONFATAL_ERROR:
268 break;
269 default:
270 valid = 0;
271 _p_error ("internal error, unexpected server response");
272 }
273
274 if (res) // could have been changed by a handler
275 {
276 PQclear (res);
277 res = NULL;
278 }
279 }
280
281 return retval;
282 }
283
tuples_ok_handler(void)284 octave_value command::tuples_ok_handler (void)
285 {
286 octave_map ret;
287
288 int nt = PQntuples (res);
289 int nf = PQnfields (res);
290
291 Cell data (nt, nf);
292 Cell columns (1, nf);
293 Cell types_name (1, nf);
294 Cell types_array (1, nf);
295 Cell types_composite (1, nf);
296 Cell types_enum (1, nf);
297 Cell types_elements (1, nf);
298 octave_map types (dim_vector (1, nf));
299
300 bool rtypes_given;
301 int l = rettypes.numel ();
302 if (l > 0)
303 {
304 if (l != nf)
305 {
306 valid = 0;
307 _p_error ("%s: wrong number of given returned types",
308 caller.c_str ());
309 return octave_value ();
310 }
311 rtypes_given = true;
312 }
313 else
314 rtypes_given = false;
315
316 for (int j = 0; j < nf; j++) // j is column
317 {
318 columns(j) = octave_value (PQfname (res, j));
319
320 int f = PQfformat (res, j);
321
322 oct_pq_to_octave_fp_t simple_type_to_octave;
323 to_octave_array_fp_t array_to_octave;
324 to_octave_composite_fp_t composite_to_octave;
325
326 oct_pq_conv_t *conv = NULL; // silence inadequate warning by
327 // initializing it here
328 pq_oct_type_t oct_type;
329
330 if (rtypes_given) // for internal reading of system tables
331 {
332 std::string type;
333 bool err;
334 SET_ERR (type = rettypes(j).string_value (), err);
335 if (err)
336 {
337 valid = 0;
338 _p_error ("%s: could not convert given type to string",
339 caller.c_str ());
340 break;
341 }
342 else if (! (conv = pgtype_from_spec (conn, type, oct_type)))
343 {
344 valid = 0;
345 break;
346 }
347 }
348 else if (! (conv = pgtype_from_spec (conn, PQftype (res, j), oct_type)))
349 {
350 valid = 0;
351 break;
352 }
353
354 if (f)
355 {
356 array_to_octave = &to_octave_bin_array;
357 composite_to_octave = &to_octave_bin_composite;
358 // will be NULL for non-simple converters
359 simple_type_to_octave = conv->to_octave_bin;
360 }
361 else
362 {
363 array_to_octave = &to_octave_str_array;
364 composite_to_octave = &to_octave_str_composite;
365 // will be NULL for non-simple converters
366 simple_type_to_octave = conv->to_octave_str;
367 }
368
369 // prepare type information
370 types_name(j) = octave_value (conv->name);
371 types_array(j) = octave_value (oct_type == array);
372 types_enum(j) = octave_value (conv->is_enum);
373 types_composite(j) = octave_value (conv->is_composite);
374 if (conv->is_composite)
375 {
376 // To implement here: recursively go through the elements
377 // and return respective recursive structures. This has the
378 // side effect that all converters necessary for this query
379 // will be looked up and cached (if they aren't already), so
380 // in the actual conversion of composite types only cache
381 // reads are performed, no map lookups.
382
383 bool err = false;
384
385 types_elements(j) = octave_value (get_elements_typeinfo (conv, err));
386
387 if (err)
388 {
389 valid = 0;
390 break;
391 }
392 }
393
394 for (int i = 0; i < nt; i++) // i is row
395 {
396 if (PQgetisnull (res, i, j))
397 data(i, j) = octave_value (octave_NA);
398 else
399 {
400 char *v = PQgetvalue (res, i, j);
401 int nb = PQgetlength (res, i, j);
402 octave_value ov;
403
404 switch (oct_type)
405 {
406 case simple:
407 if (simple_type_to_octave (conn, v, ov, nb))
408 valid = 0;
409 break;
410
411 case array:
412 if (array_to_octave (conn, v, ov, nb, conv))
413 valid = 0;
414 break;
415
416 case composite:
417 if (composite_to_octave (conn, v, ov, nb, conv))
418 valid = 0;
419 break;
420
421 default:
422 // should not get here
423 _p_error ("%s: internal error, undefined type identifier",
424 caller.c_str ());
425
426 valid = 0;
427 }
428
429 if (valid)
430 data(i, j) = ov;
431 else
432 break;
433 }
434 }
435
436 if (! valid)
437 break;
438 }
439
440 if (! valid)
441 return octave_value ();
442 else
443 {
444 ret.assign ("data", octave_value (data));
445 ret.assign ("columns", octave_value (columns));
446
447 types.setfield ("name", types_name);
448 types.setfield ("is_array", types_array);
449 types.setfield ("is_composite", types_composite);
450 types.setfield ("is_enum", types_enum);
451 types.setfield ("elements", types_elements);
452 ret.assign ("types", octave_value (types));
453
454 return octave_value (ret);
455 }
456 }
457
copy_out_handler(const std::string & outfile)458 octave_value command::copy_out_handler (const std::string &outfile)
459 {
460 octave_value retval;
461
462 if (! outfile.empty ())
463 {
464 // store unchecked output in file
465
466 std::ofstream ostr (outfile.c_str (), std::ios_base::out);
467 if (ostr.fail ())
468 {
469 valid = 0;
470 _p_error ("could not open output file %s", outfile.c_str ());
471 return retval;
472 }
473
474 char *data;
475 int nb;
476 while ((nb = PQgetCopyData (cptr, &data, 0)) > 0)
477 {
478 if (! (ostr.fail () || ostr.bad ()))
479 {
480 ostr.write (data, nb);
481 if (ostr.bad ())
482 _p_error ("write to file failed");
483 }
484 PQfreemem (data);
485 }
486
487 if (! ostr.bad ())
488 ostr.close ();
489
490 if (nb == -2)
491 {
492 valid = 0;
493 _p_error ("server error in copy-out: %s", PQerrorMessage (cptr));
494 }
495 else
496 {
497 PQclear (res);
498
499 if ((res = PQgetResult (cptr)))
500 {
501 if ((state = PQresultStatus (res)) == PGRES_FATAL_ERROR)
502 {
503 valid = 0;
504 _p_error ("server error in copy-out: %s",
505 PQerrorMessage (cptr));
506 }
507 }
508 else
509 {
510 valid = 0;
511 _p_error ("unexpectedly got no result information");
512 }
513 }
514 }
515 else
516 {
517 valid = 0;
518 _p_error ("no output file given");
519 }
520
521 return octave_value (std::string ("copy out"));
522 }
523
copy_in_handler(const std::string & infile,const Cell & data,const Cell & cin_types,bool oids,bool var)524 octave_value command::copy_in_handler (const std::string &infile,
525 const Cell &data,
526 const Cell &cin_types,
527 bool oids,
528 bool var)
529 {
530 octave_value retval;
531
532 #define OCT_PQ_READSIZE 4096
533
534 char buff [OCT_PQ_READSIZE];
535
536 if (! var)
537 {
538 // read unchecked input from file
539
540 if (infile.empty ())
541 {
542 valid = 0;
543
544 _p_error ("no input file given");
545
546 return retval;
547 }
548
549 std::ifstream istr (infile.c_str (), std::ios_base::in);
550 if (istr.fail ())
551 {
552 _p_error ("could not open input file %s", infile.c_str ());
553
554 PQputCopyEnd (cptr, "could not open input file");
555
556 _p_error ("server error: %s", PQerrorMessage (cptr));
557
558 valid = 0;
559
560 return retval;
561 }
562
563 do
564 {
565 istr.read (buff, OCT_PQ_READSIZE);
566
567 if (istr.bad ())
568 {
569 valid = 0;
570
571 _p_error ("could not read file %s", infile.c_str ());
572
573 break;
574 }
575 else
576 {
577 int nb;
578
579 if ((nb = istr.gcount ()) > 0)
580 if (PQputCopyData (cptr, buff, nb) == -1)
581 {
582 valid = 0;
583
584 _p_error ("%s", PQerrorMessage (cptr));
585
586 break;
587 }
588 }
589 }
590 while (! istr.eof ());
591
592 istr.close ();
593
594 if (! valid)
595 {
596 PQputCopyEnd (cptr, "copy-in interrupted");
597
598 _p_error ("%s", PQerrorMessage (cptr));
599 }
600 else
601 {
602 if (PQputCopyEnd (cptr, NULL) == -1)
603 {
604 valid = 0;
605 _p_error ("%s", PQerrorMessage (cptr));
606 }
607 else
608 {
609 PQclear (res);
610
611 if ((res = PQgetResult (cptr)))
612 {
613 if ((state = PQresultStatus (res)) == PGRES_FATAL_ERROR)
614 {
615 valid = 0;
616 _p_error ("server error in copy-in: %s",
617 PQerrorMessage (cptr));
618 }
619 }
620 else
621 {
622 valid = 0;
623 _p_error ("unexpectedly got no result information");
624 }
625 }
626 }
627 }
628 else
629 {
630 // copy in from octave variable
631
632 dim_vector dv = data.dims ();
633 octave_idx_type r = dv(0);
634 octave_idx_type c = dv(1);
635
636 octave_idx_type nf = PQnfields (res);
637 if (c != nf + oids)
638 {
639 valid = 0;
640
641 _p_error ("variable for copy-in has %i columns, but should have %i",
642 c, nf + oids);
643
644 PQputCopyEnd
645 (cptr, "variable for copy-in has wrong number of columns");
646 }
647 else if (! PQbinaryTuples (res))
648 {
649 valid = 0;
650
651 _p_error ("copy-in from variable must use binary mode");
652
653 PQputCopyEnd (cptr, "copy-in from variable must use binary mode");
654 }
655 else
656 {
657 for (octave_idx_type j = 0; j < nf; j++)
658 if (! PQfformat (res, j))
659 {
660 valid = 0;
661
662 _p_error ("copy-in from variable must use binary mode in all columns");
663
664 PQputCopyEnd (cptr, "copy-in from variable must use binary mode in all columns");
665
666 break;
667 }
668 }
669
670 if (! valid)
671 {
672 _p_error ("server error: %s", PQerrorMessage (cptr));
673
674 return retval;
675 }
676
677 char header [COPY_HEADER_SIZE];
678 memset (header, 0, COPY_HEADER_SIZE);
679 strcpy (header, "PGCOPY\n\377\r\n\0");
680 uint32_t tpu32 = htobe32 (uint32_t (oids) << 16);
681 memcpy (&header[11], &tpu32, 4);
682
683 char trailer [2];
684 int16_t tp16 = htobe16 (int16_t (-1));
685 memcpy (&trailer, &tp16, 2);
686
687 if (PQputCopyData (cptr, header, COPY_HEADER_SIZE) == -1)
688 {
689 PQputCopyEnd (cptr, "could not send header");
690
691 valid = 0;
692
693 _p_error ("server error: %s", PQerrorMessage (cptr));
694 }
695 else
696 {
697 oct_pq_conv_t *convs [c];
698 memset (convs, 0, sizeof (convs));
699 pq_oct_type_t oct_types [c];
700
701 for (octave_idx_type i = 0; i < r; i++) // i is row
702 {
703 int16_t fc = htobe16 (int16_t (nf));
704 if (PQputCopyData (cptr, (char *) &fc, 2) == -1)
705 {
706 _p_error ("%s", PQerrorMessage (cptr));
707
708 PQputCopyEnd (cptr, "error sending field count");
709
710 _p_error ("server error: %s", PQerrorMessage (cptr));
711
712 valid = 0;
713
714 break;
715 }
716
717 // j is column of argument data
718 for (octave_idx_type j = 0; j < c; j++)
719 {
720 if (data(i, j).is_real_scalar () &&
721 data(i, j).isna ().bool_value ())
722 {
723 int32_t t = htobe32 (int32_t (-1));
724 if (PQputCopyData (cptr, (char *) &t, 4) == -1)
725 {
726 valid = 0;
727
728 _p_error ("could not send NULL in copy-in");
729
730 break;
731 }
732 }
733 else
734 {
735 if (! convs [j])
736 {
737 if ((j == 0) && oids)
738 {
739 std::string t ("oid");
740 if (! (convs[0] =
741 pgtype_from_spec (conn, t, oct_types[0])))
742 {
743 valid = 0;
744
745 _p_error ("could not get converter for oid in copy-in");
746 break;
747 }
748 }
749 else
750 {
751 if (cin_types(j).OV_ISEMPTY ())
752 {
753 oct_types[j] = simple;
754
755 if (! (convs[j] =
756 pgtype_from_octtype (conn,
757 data(i, j))))
758 {
759 valid = 0;
760
761 _p_error ("could not determine type in column %i for copy-in",
762 j);
763
764 break;
765 }
766 }
767 else
768 {
769 bool err;
770 std::string s;
771 SET_ERR (s = cin_types(j).string_value (),
772 err);
773 if (err)
774 {
775 valid = 0;
776
777 _p_error ("column type specification no string");
778
779 break;
780 }
781
782 if (! (convs[j] =
783 pgtype_from_spec (conn, s,
784 oct_types[j])))
785 {
786 valid = 0;
787
788 _p_error ("invalid column type specification");
789
790 break;
791 }
792 }
793 }
794 } // ! convs [j]
795
796 oct_pq_dynvec_t val;
797
798 bool conversion_failed = false;
799 switch (oct_types[j])
800 {
801 case simple:
802 if (convs[j]->from_octave_bin (conn, data(i, j), val))
803 conversion_failed = true;
804 break;
805
806 case array:
807 if (from_octave_bin_array (conn, data(i, j), val,
808 convs[j]))
809 conversion_failed = true;
810 break;
811
812 case composite:
813 if (from_octave_bin_composite (conn, data(i, j), val,
814 convs[j]))
815 conversion_failed = true;
816 break;
817
818 default:
819 // should not get here
820 _p_error ("internal error, undefined type identifier");
821 conversion_failed = true;
822 }
823
824 if (conversion_failed)
825 {
826 valid = 0;
827 error ("could not convert data(%li, %li) for copy-in",
828 i, j);
829 }
830 else
831 {
832 uint32_t t = htobe32 (uint32_t (val.size ()));
833 if (PQputCopyData (cptr, (char *) &t, 4) == -1)
834 {
835 valid = 0;
836 _p_error ("could not send data length in copy-in");
837 }
838 else if (PQputCopyData (cptr, &(val.front ()),
839 val.size ()) == -1)
840 {
841 valid = 0;
842 _p_error ("could not send copy-in data");
843 }
844 }
845
846 if (! valid) break;
847 }
848 } // columns of argument data
849
850 if (! valid)
851 {
852 PQputCopyEnd (cptr, "error sending copy-in data");
853
854 _p_error ("server error: %s", PQerrorMessage (cptr));
855
856 break;
857 }
858 } // rows of argument data
859 }
860
861 if (valid)
862 if (PQputCopyData (cptr, trailer, 2) == -1)
863 {
864 valid = 0;
865
866 PQputCopyEnd (cptr, "could not send trailer");
867
868 _p_error ("%s", PQerrorMessage (cptr));
869 }
870
871 if (valid)
872 {
873 if (PQputCopyEnd (cptr, NULL) == -1)
874 {
875 valid = 0;
876 _p_error ("%s", PQerrorMessage (cptr));
877 }
878 else
879 {
880 PQclear (res);
881
882 if ((res = PQgetResult (cptr)))
883 {
884 if ((state = PQresultStatus (res)) == PGRES_FATAL_ERROR)
885 {
886 valid = 0;
887 _p_error ("server error in copy-in: %s",
888 PQerrorMessage (cptr));
889 }
890 }
891 else
892 {
893 valid = 0;
894 _p_error ("unexpectedly got no result information");
895 }
896 }
897 }
898 } // copy from variable
899
900 return octave_value (std::string ("copy in"));
901 }
902