1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2008, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
3 
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16 
17 #include <config.h>
18 
19 #include "data/psql-reader.h"
20 
21 #include <inttypes.h>
22 #include <math.h>
23 #include <stdlib.h>
24 
25 #include "data/calendar.h"
26 #include "data/casereader-provider.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/variable.h"
30 #include "libpspp/i18n.h"
31 #include "libpspp/message.h"
32 #include "libpspp/misc.h"
33 #include "libpspp/str.h"
34 
35 #include "gl/c-strcase.h"
36 #include "gl/minmax.h"
37 #include "gl/xalloc.h"
38 
39 #include "gettext.h"
40 #define _(msgid) gettext (msgid)
41 #define N_(msgid) (msgid)
42 
43 
44 #if !PSQL_SUPPORT
45 struct casereader *
psql_open_reader(struct psql_read_info * info UNUSED,struct dictionary ** dict UNUSED)46 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
47 {
48   msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
49 
50   return NULL;
51 }
52 
53 #else
54 
55 #include <stdint.h>
56 #include <libpq-fe.h>
57 
58 
59 /* Default width of string variables. */
60 #define PSQL_DEFAULT_WIDTH 8
61 
62 /* These macros  must be the same as in catalog/pg_types.h from the postgres source */
63 #define BOOLOID            16
64 #define BYTEAOID           17
65 #define CHAROID            18
66 #define NAMEOID            19
67 #define INT8OID            20
68 #define INT2OID            21
69 #define INT4OID            23
70 #define TEXTOID            25
71 #define OIDOID             26
72 #define FLOAT4OID          700
73 #define FLOAT8OID          701
74 #define CASHOID            790
75 #define BPCHAROID          1042
76 #define VARCHAROID         1043
77 #define DATEOID            1082
78 #define TIMEOID            1083
79 #define TIMESTAMPOID       1114
80 #define TIMESTAMPTZOID     1184
81 #define INTERVALOID        1186
82 #define TIMETZOID          1266
83 #define NUMERICOID         1700
84 
85 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
86 
87 static struct ccase *psql_casereader_read (struct casereader *, void *);
88 
89 static const struct casereader_class psql_casereader_class =
90   {
91     psql_casereader_read,
92     psql_casereader_destroy,
93     NULL,
94     NULL,
95   };
96 
97 struct psql_reader
98 {
99   PGconn *conn;
100   PGresult *res;
101   int tuple;
102 
103   bool integer_datetimes;
104 
105   double postgres_epoch;
106 
107   struct caseproto *proto;
108   struct dictionary *dict;
109 
110   /* An array of ints, which maps psql column numbers into
111      pspp variables */
112   struct variable **vmap;
113   size_t vmapsize;
114 
115   struct string fetch_cmd;
116   int cache_size;
117 };
118 
119 
120 static struct ccase *set_value (struct psql_reader *r);
121 
122 
123 
124 #if WORDS_BIGENDIAN
125 static void
data_to_native(const void * in_,void * out_,int len)126 data_to_native (const void *in_, void *out_, int len)
127 {
128   int i;
129   const unsigned char *in = in_;
130   unsigned char *out = out_;
131   for (i = 0 ; i < len ; ++i)
132     out[i] = in[i];
133 }
134 #else
135 static void
data_to_native(const void * in_,void * out_,int len)136 data_to_native (const void *in_, void *out_, int len)
137 {
138   int i;
139   const unsigned char *in = in_;
140   unsigned char *out = out_;
141   for (i = 0 ; i < len ; ++i)
142     out[len - i - 1] = in[i];
143 }
144 #endif
145 
146 
147 #define GET_VALUE(IN, OUT) do { \
148     size_t sz = sizeof (OUT); \
149     data_to_native (*(IN), &(OUT), sz) ; \
150     (*IN) += sz; \
151 } while (false)
152 
153 
154 #if 0
155 static void
156 dump (const unsigned char *x, int l)
157 {
158   int i;
159 
160   for (i = 0; i < l ; ++i)
161     {
162       printf ("%02x ", x[i]);
163     }
164 
165   putchar ('\n');
166 
167   for (i = 0; i < l ; ++i)
168     {
169       if (isprint (x[i]))
170 	printf ("%c ", x[i]);
171       else
172 	printf ("   ");
173     }
174 
175   putchar ('\n');
176 }
177 #endif
178 
179 static struct variable *
create_var(struct psql_reader * r,const struct fmt_spec * fmt,int width,const char * suggested_name,int col)180 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
181 	    int width, const char *suggested_name, int col)
182 {
183   unsigned long int vx = 0;
184   struct variable *var;
185   char *name;
186 
187   name = dict_make_unique_var_name (r->dict, suggested_name, &vx);
188   var = dict_create_var (r->dict, name, width);
189   free (name);
190 
191   var_set_both_formats (var, fmt);
192 
193   if (col != -1)
194     {
195       r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
196 
197       r->vmap[col] = var;
198       r->vmapsize = col + 1;
199     }
200 
201   return var;
202 }
203 
204 
205 
206 
207 /* Fill the cache */
208 static bool
reload_cache(struct psql_reader * r)209 reload_cache (struct psql_reader *r)
210 {
211   PQclear (r->res);
212   r->tuple = 0;
213 
214   r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
215 
216   if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
217     {
218       PQclear (r->res);
219       r->res = NULL;
220       return false;
221     }
222 
223   return true;
224 }
225 
226 
227 struct casereader *
psql_open_reader(struct psql_read_info * info,struct dictionary ** dict)228 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
229 {
230   int i;
231   int n_fields, n_tuples;
232   PGresult *qres = NULL;
233   casenumber n_cases = CASENUMBER_MAX;
234   const char *encoding;
235 
236   struct psql_reader *r = xzalloc (sizeof *r);
237   struct string query ;
238 
239   r->conn = PQconnectdb (info->conninfo);
240   if (NULL == r->conn)
241     {
242       msg (ME, _("Memory error whilst opening psql source"));
243       goto error;
244     }
245 
246   if (PQstatus (r->conn) != CONNECTION_OK)
247     {
248       msg (ME, _("Error opening psql source: %s."),
249 	   PQerrorMessage (r->conn));
250 
251       goto error;
252     }
253 
254   {
255     int ver_num = 0;
256     const char *vers = PQparameterStatus (r->conn, "server_version");
257 
258     sscanf (vers, "%d", &ver_num);
259 
260     if (ver_num < 8)
261       {
262 	msg (ME,
263 	     _("Postgres server is version %s."
264 	       " Reading from versions earlier than 8.0 is not supported."),
265 	     vers);
266 
267 	goto error;
268       }
269   }
270 
271   {
272     const char *dt =  PQparameterStatus (r->conn, "integer_datetimes");
273 
274     r->integer_datetimes = (0 == c_strcasecmp (dt, "on"));
275   }
276 
277 #if USE_SSL
278   if (PQgetssl (r->conn) == NULL)
279 #endif
280     {
281       if (! info->allow_clear)
282 	{
283 	  msg (ME, _("Connection is unencrypted, "
284 		     "but unencrypted connections have not been permitted."));
285 	  goto error;
286 	}
287     }
288 
289   r->postgres_epoch = calendar_gregorian_to_offset (2000, 1, 1, NULL);
290 
291   {
292     const int enc = PQclientEncoding (r->conn);
293 
294     /* According to section 22.2 of the Postgresql manual
295        a value of zero (SQL_ASCII) indicates
296        "a declaration of ignorance about the encoding".
297        Accordingly, we use the default encoding
298        if we find this value.
299     */
300     encoding = enc ? pg_encoding_to_char (enc) : get_default_encoding ();
301 
302     /* Create the dictionary and populate it */
303     *dict = r->dict = dict_create (encoding);
304   }
305 
306   const int version = PQserverVersion (r->conn);
307   ds_init_empty (&query);
308   /*
309     Versions before 9.1 don't have the REPEATABLE READ isolation level.
310     However according to <a12321aabb@gmail.com> if the server is in the
311     "hot standby" mode then SERIALIZABLE won't work.
312    */
313   ds_put_c_format (&query,
314 		   "BEGIN READ ONLY ISOLATION LEVEL %s; "
315 		   "DECLARE  pspp BINARY CURSOR FOR ",
316 		   (version < 90100) ? "SERIALIZABLE" : "REPEATABLE READ");
317 
318   ds_put_substring (&query, info->sql.ss);
319 
320   qres = PQexec (r->conn, ds_cstr (&query));
321   ds_destroy (&query);
322   if (PQresultStatus (qres) != PGRES_COMMAND_OK)
323     {
324       msg (ME, _("Error from psql source: %s."),
325 	   PQresultErrorMessage (qres));
326       goto error;
327     }
328 
329   PQclear (qres);
330 
331 
332   /* Now use the count() function to find the total number of cases
333      that this query returns.
334      Doing this incurs some overhead.  The server has to iterate every
335      case in order to find this number.  However, it's performed on the
336      server side, and in all except the most huge databases the extra
337      overhead will be worth the effort.
338      On the other hand, most PSPP functions don't need to know this.
339      The GUI is the notable exception.
340   */
341   ds_init_cstr (&query, "SELECT count (*) FROM (");
342   ds_put_substring (&query, info->sql.ss);
343   ds_put_cstr (&query, ") stupid_sql_standard");
344 
345   qres = PQexec (r->conn, ds_cstr (&query));
346   ds_destroy (&query);
347   if (PQresultStatus (qres) != PGRES_TUPLES_OK)
348     {
349       msg (ME, _("Error from psql source: %s."),
350 	   PQresultErrorMessage (qres));
351       goto error;
352     }
353   n_cases = atol (PQgetvalue (qres, 0, 0));
354   PQclear (qres);
355 
356   qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
357   if (PQresultStatus (qres) != PGRES_TUPLES_OK)
358     {
359       msg (ME, _("Error from psql source: %s."),
360 	   PQresultErrorMessage (qres));
361       goto error;
362     }
363 
364   n_tuples = PQntuples (qres);
365   n_fields = PQnfields (qres);
366 
367   r->proto = NULL;
368   r->vmap = NULL;
369   r->vmapsize = 0;
370 
371   for (i = 0 ; i < n_fields ; ++i)
372     {
373       struct variable *var;
374       struct fmt_spec fmt = {FMT_F, 8, 2};
375       Oid type = PQftype (qres, i);
376       int width = 0;
377       int length ;
378 
379       /* If there are no data then make a finger in the air
380 	 guess at the contents */
381       if (n_tuples > 0)
382 	length = PQgetlength (qres, 0, i);
383       else
384 	length = PSQL_DEFAULT_WIDTH;
385 
386       switch (type)
387 	{
388 	case BOOLOID:
389         case OIDOID:
390 	case INT2OID:
391 	case INT4OID:
392         case INT8OID:
393         case FLOAT4OID:
394 	case FLOAT8OID:
395 	  fmt.type = FMT_F;
396 	  break;
397 	case CASHOID:
398 	  fmt.type = FMT_DOLLAR;
399 	  break;
400         case CHAROID:
401 	  fmt.type = FMT_A;
402 	  width = length > 0 ? length : 1;
403 	  fmt.d = 0;
404 	  fmt.w = 1;
405 	  break;
406         case TEXTOID:
407 	case VARCHAROID:
408 	case BPCHAROID:
409 	  fmt.type = FMT_A;
410 	  width = (info->str_width == -1) ?
411 	    ROUND_UP (length, PSQL_DEFAULT_WIDTH) : info->str_width;
412 	  fmt.w = width;
413 	  fmt.d = 0;
414           break;
415 	case BYTEAOID:
416 	  fmt.type = FMT_AHEX;
417 	  width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
418 	  fmt.w = width * 2;
419 	  fmt.d = 0;
420 	  break;
421 	case INTERVALOID:
422 	  fmt.type = FMT_DTIME;
423 	  width = 0;
424 	  fmt.d = 0;
425 	  fmt.w = 13;
426 	  break;
427 	case DATEOID:
428 	  fmt.type = FMT_DATE;
429 	  width = 0;
430 	  fmt.w = 11;
431 	  fmt.d = 0;
432 	  break;
433 	case TIMEOID:
434 	case TIMETZOID:
435 	  fmt.type = FMT_TIME;
436 	  width = 0;
437 	  fmt.w = 11;
438 	  fmt.d = 0;
439 	  break;
440 	case TIMESTAMPOID:
441 	case TIMESTAMPTZOID:
442 	  fmt.type = FMT_DATETIME;
443 	  fmt.d = 0;
444 	  fmt.w = 22;
445 	  width = 0;
446 	  break;
447 	case NUMERICOID:
448 	  fmt.type = FMT_E;
449 	  fmt.d = 2;
450 	  fmt.w = 40;
451 	  width = 0;
452 	  break;
453 	default:
454           msg (MW, _("Unsupported OID %d.  SYSMIS values will be inserted."), type);
455 	  fmt.type = FMT_A;
456 	  width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
457 	  fmt.w = width ;
458 	  fmt.d = 0;
459 	  break;
460 	}
461 
462       if (width == 0 && fmt_is_string (fmt.type))
463 	fmt.w = width = PSQL_DEFAULT_WIDTH;
464 
465 
466       var = create_var (r, &fmt, width, PQfname (qres, i), i);
467       if (type == NUMERICOID && n_tuples > 0)
468 	{
469 	  const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
470 	  struct fmt_spec fmt;
471 	  int16_t n_digits, weight, dscale;
472 	  uint16_t sign;
473 
474 	  GET_VALUE (&vptr, n_digits);
475 	  GET_VALUE (&vptr, weight);
476 	  GET_VALUE (&vptr, sign);
477 	  GET_VALUE (&vptr, dscale);
478 
479 	  fmt.d = dscale;
480 	  fmt.type = FMT_E;
481 	  fmt.w = fmt_max_output_width (fmt.type) ;
482 	  fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
483 	  var_set_both_formats (var, &fmt);
484 	}
485 
486       /* Timezones need an extra variable */
487       switch (type)
488 	{
489 	case TIMETZOID:
490 	  {
491 	    struct string name;
492 	    ds_init_cstr (&name, var_get_name (var));
493 	    ds_put_cstr (&name, "-zone");
494 	    fmt.type = FMT_F;
495 	    fmt.w = 8;
496 	    fmt.d = 2;
497 
498 	    create_var (r, &fmt, 0, ds_cstr (&name), -1);
499 
500 	    ds_destroy (&name);
501 	  }
502 	  break;
503 
504 	case INTERVALOID:
505 	  {
506 	    struct string name;
507 	    ds_init_cstr (&name, var_get_name (var));
508 	    ds_put_cstr (&name, "-months");
509 	    fmt.type = FMT_F;
510 	    fmt.w = 3;
511 	    fmt.d = 0;
512 
513 	    create_var (r, &fmt, 0, ds_cstr (&name), -1);
514 
515 	    ds_destroy (&name);
516 	  }
517 	default:
518 	  break;
519 	}
520     }
521 
522   PQclear (qres);
523 
524   qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
525   if (PQresultStatus (qres) != PGRES_COMMAND_OK)
526     {
527       PQclear (qres);
528       goto error;
529     }
530   PQclear (qres);
531 
532   r->cache_size = info->bsize != -1 ? info->bsize: 4096;
533 
534   ds_init_empty (&r->fetch_cmd);
535   ds_put_format (&r->fetch_cmd,  "FETCH FORWARD %d FROM pspp", r->cache_size);
536 
537   reload_cache (r);
538   r->proto = caseproto_ref (dict_get_proto (*dict));
539 
540   return casereader_create_sequential
541     (NULL,
542      r->proto,
543      n_cases,
544      &psql_casereader_class, r);
545 
546  error:
547   dict_unref (*dict);
548 
549   psql_casereader_destroy (NULL, r);
550   return NULL;
551 }
552 
553 
554 static void
psql_casereader_destroy(struct casereader * reader UNUSED,void * r_)555 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
556 {
557   struct psql_reader *r = r_;
558   if (r == NULL)
559     return ;
560 
561   ds_destroy (&r->fetch_cmd);
562   free (r->vmap);
563   if (r->res) PQclear (r->res);
564   PQfinish (r->conn);
565   caseproto_unref (r->proto);
566 
567   free (r);
568 }
569 
570 
571 
572 static struct ccase *
psql_casereader_read(struct casereader * reader UNUSED,void * r_)573 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
574 {
575   struct psql_reader *r = r_;
576 
577   if (NULL == r->res || r->tuple >= r->cache_size)
578     {
579       if (! reload_cache (r))
580 	return false;
581     }
582 
583   return set_value (r);
584 }
585 
586 static struct ccase *
set_value(struct psql_reader * r)587 set_value (struct psql_reader *r)
588 {
589   struct ccase *c;
590   int n_vars;
591   int i;
592 
593   assert (r->res);
594 
595   n_vars = PQnfields (r->res);
596 
597   if (r->tuple >= PQntuples (r->res))
598     return NULL;
599 
600   c = case_create (r->proto);
601   case_set_missing (c);
602 
603 
604   for (i = 0 ; i < n_vars ; ++i)
605     {
606       Oid type = PQftype (r->res, i);
607       const struct variable *v = r->vmap[i];
608       union value *val = case_data_rw (c, v);
609 
610       union value *val1 = NULL;
611 
612       switch (type)
613 	{
614 	case INTERVALOID:
615 	case TIMESTAMPTZOID:
616 	case TIMETZOID:
617 	  if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
618 	    {
619 	      const struct variable *v1 = NULL;
620 	      v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
621 
622 	      val1 = case_data_rw (c, v1);
623 	    }
624 	  break;
625 	default:
626 	  break;
627 	}
628 
629 
630       if (PQgetisnull (r->res, r->tuple, i))
631 	{
632 	  value_set_missing (val, var_get_width (v));
633 
634 	  switch (type)
635 	    {
636 	    case INTERVALOID:
637 	    case TIMESTAMPTZOID:
638 	    case TIMETZOID:
639 	      val1->f = SYSMIS;
640 	      break;
641 	    default:
642 	      break;
643 	    }
644 	}
645       else
646 	{
647 	  const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
648 	  int length = PQgetlength (r->res, r->tuple, i);
649 
650 	  int var_width = var_get_width (v);
651 	  switch (type)
652 	    {
653 	    case BOOLOID:
654 	      {
655 		int8_t x;
656 		GET_VALUE (&vptr, x);
657 		val->f = x;
658 	      }
659 	      break;
660 
661 	    case OIDOID:
662 	    case INT2OID:
663 	      {
664 		int16_t x;
665 		GET_VALUE (&vptr, x);
666 		val->f = x;
667 	      }
668 	      break;
669 
670 	    case INT4OID:
671 	      {
672 		int32_t x;
673 		GET_VALUE (&vptr, x);
674 		val->f = x;
675 	      }
676 	      break;
677 
678 	    case INT8OID:
679 	      {
680 		int64_t x;
681 		GET_VALUE (&vptr, x);
682 		val->f = x;
683 	      }
684 	      break;
685 
686 	    case FLOAT4OID:
687 	      {
688 		float n;
689 		GET_VALUE (&vptr, n);
690 		val->f = n;
691 	      }
692 	      break;
693 
694 	    case FLOAT8OID:
695 	      {
696 		double n;
697 		GET_VALUE (&vptr, n);
698 		val->f = n;
699 	      }
700 	      break;
701 
702 	    case CASHOID:
703 	      {
704 		/* Postgres 8.3 uses 64 bits.
705 		   Earlier versions use 32 */
706 		switch (length)
707 		  {
708 		  case 8:
709 		    {
710 		      int64_t x;
711 		      GET_VALUE (&vptr, x);
712 		      val->f = x / 100.0;
713 		    }
714 		    break;
715 		  case 4:
716 		    {
717 		      int32_t x;
718 		      GET_VALUE (&vptr, x);
719 		      val->f = x / 100.0;
720 		    }
721 		    break;
722 		  default:
723 		    val->f = SYSMIS;
724 		    break;
725 		  }
726 	      }
727 	      break;
728 
729 	    case INTERVALOID:
730 	      {
731 		if (r->integer_datetimes)
732 		  {
733 		    uint32_t months;
734 		    uint32_t days;
735 		    uint32_t us;
736 		    uint32_t things;
737 
738 		    GET_VALUE (&vptr, things);
739 		    GET_VALUE (&vptr, us);
740 		    GET_VALUE (&vptr, days);
741 		    GET_VALUE (&vptr, months);
742 
743 		    val->f = us / 1000000.0;
744 		    val->f += days * 24 * 3600;
745 
746 		    val1->f = months;
747 		  }
748 		else
749 		  {
750 		    uint32_t days, months;
751 		    double seconds;
752 
753 		    GET_VALUE (&vptr, seconds);
754 		    GET_VALUE (&vptr, days);
755 		    GET_VALUE (&vptr, months);
756 
757 		    val->f = seconds;
758 		    val->f += days * 24 * 3600;
759 
760 		    val1->f = months;
761 		  }
762 	      }
763 	      break;
764 
765 	    case DATEOID:
766 	      {
767 		int32_t x;
768 
769 		GET_VALUE (&vptr, x);
770 
771 		val->f = (x + r->postgres_epoch) * 24 * 3600 ;
772 	      }
773 	      break;
774 
775 	    case TIMEOID:
776 	      {
777 		if (r->integer_datetimes)
778 		  {
779 		    uint64_t x;
780 		    GET_VALUE (&vptr, x);
781 		    val->f = x / 1000000.0;
782 		  }
783 		else
784 		  {
785 		    double x;
786 		    GET_VALUE (&vptr, x);
787 		    val->f = x;
788 		  }
789 	      }
790 	      break;
791 
792 	    case TIMETZOID:
793 	      {
794 		int32_t zone;
795 		if (r->integer_datetimes)
796 		  {
797 		    uint64_t x;
798 
799 
800 		    GET_VALUE (&vptr, x);
801 		    val->f = x / 1000000.0;
802 		  }
803 		else
804 		  {
805 		    double x;
806 
807 		    GET_VALUE (&vptr, x);
808 		    val->f = x ;
809 		  }
810 
811 		GET_VALUE (&vptr, zone);
812 		val1->f = zone / 3600.0;
813 	      }
814 	      break;
815 
816 	    case TIMESTAMPOID:
817 	    case TIMESTAMPTZOID:
818 	      {
819 		if (r->integer_datetimes)
820 		  {
821 		    int64_t x;
822 
823 		    GET_VALUE (&vptr, x);
824 
825 		    x /= 1000000;
826 
827 		    val->f = (x + r->postgres_epoch * 24 * 3600);
828 		  }
829 		else
830 		  {
831 		    double x;
832 
833 		    GET_VALUE (&vptr, x);
834 
835 		    val->f = (x + r->postgres_epoch * 24 * 3600);
836 		  }
837 	      }
838 	      break;
839 	    case TEXTOID:
840 	    case VARCHAROID:
841 	    case BPCHAROID:
842 	    case BYTEAOID:
843 	      memcpy (val->s, vptr, MIN (length, var_width));
844 	      break;
845 
846 	    case NUMERICOID:
847 	      {
848 		double f = 0.0;
849 		int i;
850 		int16_t n_digits, weight, dscale;
851 		uint16_t sign;
852 
853 		GET_VALUE (&vptr, n_digits);
854 		GET_VALUE (&vptr, weight);
855 		GET_VALUE (&vptr, sign);
856 		GET_VALUE (&vptr, dscale);
857 
858 #if 0
859 		{
860 		  struct fmt_spec fmt;
861 		  fmt.d = dscale;
862 		  fmt.type = FMT_E;
863 		  fmt.w = fmt_max_output_width (fmt.type) ;
864 		  fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
865 		  var_set_both_formats (v, &fmt);
866 		}
867 #endif
868 
869 		for (i = 0 ; i < n_digits;  ++i)
870 		  {
871 		    uint16_t x;
872 		    GET_VALUE (&vptr, x);
873 		    f += x * pow (10000, weight--);
874 		  }
875 
876 		if (sign == 0x4000)
877 		  f *= -1.0;
878 
879 		if (sign == 0xC000)
880 		  val->f = SYSMIS;
881 		else
882 		  val->f = f;
883 	      }
884 	      break;
885 
886 	    default:
887 	      val->f = SYSMIS;
888 	      break;
889 	    }
890 	}
891     }
892 
893   r->tuple++;
894 
895   return c;
896 }
897 
898 #endif
899