1 /*
2  *    RS-pgsql-copy.c
3  *
4  *    $Id$
5  */
6 
7 #include "RS-PostgreSQL.h"
8 #include <Rinternals.h>
9 #include <R.h>  /* for Calloc/Free */
10 #include <ctype.h>
11 #include <stdlib.h>
12 #include <errno.h>
13 #define COPY_IN_BUFSIZE 8192
14 
15 typedef struct {
16  char *data;
17  size_t bufsize;
18  size_t defaultSize;
19 } R_StringBuffer;
20 
21 /* adapter for PQputCopyData and PQputCopyEnd
22    which is used in conjunction with COPY table from STDIN */
23 
24 /*
25  * Copies all content of the file specified with filename to the conHandle which
26  * has opened connection previously issued the COPY table from STDIN query
27  * the data is read from file sent to the database with PQputCopyData
28  * in chunks of COPY_IN_BUFSIZE.
29  * The copy ends when 0 byte could be read from the file and then the
30  * PQputCopyEnd is called to complete the copying.
31  */
32 
33 
34 static inline void chkpqcopydataerr(PGconn *, int);
35 s_object *
RS_PostgreSQL_CopyIn(Con_Handle * conHandle,s_object * filename)36 RS_PostgreSQL_CopyIn(Con_Handle * conHandle, s_object * filename)
37 {
38     S_EVALUATOR RS_DBI_connection * con;
39     PGconn *my_connection;
40 
41     char *dyn_filename;
42     char copybuf[COPY_IN_BUFSIZE];
43     FILE* filehandle;
44     size_t len;
45     int pqretcode;
46 
47     con = RS_DBI_getConnection(conHandle);
48     my_connection = (PGconn *) con->drvConnection;
49     dyn_filename = RS_DBI_copyString(CHR_EL(filename, 0));
50 
51     filehandle=fopen(dyn_filename, "r");
52     if(filehandle == NULL){
53         char errmsg[1024];
54         snprintf(errmsg, 1024, "could not open file: %s", dyn_filename);
55         RS_DBI_errorMessage(dyn_filename, RS_DBI_ERROR);
56         return S_NULL_ENTRY;
57     }
58 
59     while((len = fread(copybuf,1,COPY_IN_BUFSIZE, filehandle))){
60         pqretcode = PQputCopyData(my_connection, copybuf, len);
61         chkpqcopydataerr(my_connection, pqretcode);
62 
63     }
64     PQputCopyEnd(my_connection, NULL);
65     fclose(filehandle);
66 
67     free(dyn_filename);
68     return S_NULL_ENTRY;
69 }
70 
71 
R_AllocStringBuffer(size_t blen,R_StringBuffer * buf)72 void *R_AllocStringBuffer(size_t blen, R_StringBuffer *buf)
73 {
74     size_t blen1, bsize = buf->defaultSize;
75 
76     if(blen < buf->bufsize) return buf->data;
77     blen1 = blen = (blen + 1);
78     blen = (blen / bsize) * bsize;
79     if(blen < blen1) blen += bsize;
80 
81     if(buf->data == NULL) {
82         buf->data = (char *) malloc(blen);
83         buf->data[0] = '\0';
84     } else
85         buf->data = (char *) realloc(buf->data, blen);
86     buf->bufsize = blen;
87     if(!buf->data) {
88         buf->bufsize = 0;
89         /* don't translate internal error message */
90         error("could not allocate memory (%u Mb) in C function 'R_AllocStringBuffer'",
91               (unsigned int) blen/1024/1024);
92     }
93     return buf->data;
94 }
95 
96 
97 void
R_FreeStringBuffer(R_StringBuffer * buf)98 R_FreeStringBuffer(R_StringBuffer *buf)
99 {
100     if (buf->data != NULL) {
101         free(buf->data);
102         buf->bufsize = 0;
103         buf->data = NULL;
104     }
105 }
106 
107 
isna(SEXP x,int indx)108 static Rboolean isna(SEXP x, int indx)
109 {
110     Rcomplex rc;
111     switch(TYPEOF(x)) {
112     case LGLSXP:
113         return LOGICAL(x)[indx] == NA_LOGICAL;
114         break;
115     case INTSXP:
116         return INTEGER(x)[indx] == NA_INTEGER;
117         break;
118     case REALSXP:
119         return ISNAN(REAL(x)[indx]);
120         break;
121     case STRSXP:
122         return STRING_ELT(x, indx) == NA_STRING;
123         break;
124     case CPLXSXP:
125         rc = COMPLEX(x)[indx];
126         return ISNAN(rc.r) || ISNAN(rc.i);
127         break;
128     default:
129         break;
130     }
131     return FALSE;
132 }
133 
134 /* a version of EncodeElement with different escaping of char strings */
135 static const char
EncodeElementSconn(PGconn * my_connection,SEXP x,int indx,R_StringBuffer * buff,char cdec)136 *EncodeElementSconn(PGconn* my_connection, SEXP x, int indx,
137                 R_StringBuffer *buff, char cdec)
138 {
139     buff->data[0]='\0';
140     switch(TYPEOF(x)) {
141        case RAWSXP:{
142             const unsigned char*rawdata;
143             unsigned char* escapedstring;
144             int len;
145             size_t escaped_length;
146             len = LENGTH(x);
147             rawdata=RAW(x);
148             escapedstring = PQescapeByteaConn(my_connection, rawdata, len, &escaped_length);
149             memcpy(buff->data, escapedstring, escaped_length);
150             buff->data[escaped_length]='\0';
151             free(escapedstring);
152             return buff->data;
153 
154 
155        }
156        case STRSXP:
157        {
158 	    const char *s = translateCharUTF8(STRING_ELT(x, indx));
159 	    char *u, *cbuf;
160             int j, len, blen, offset;
161 	    len = strlen(s);
162 	    blen = len * 2 + 1;
163             R_AllocStringBuffer(blen, buff);
164 	    u = cbuf = buff->data;
165 	    offset = 0;
166 	    for (j = 0; j < len; j++){
167                 switch(s[offset+j]){
168 /* http://www.postgresql.org/docs/8.1/static/sql-copy.html */
169                     case '\b':
170                         *u++ = '\\';
171                         *u++ = 'b';
172                         break;
173                     case '\f':
174                         *u++ = '\\';
175                         *u++ = 'f';
176                         break;
177                     case '\n':
178                         *u++ = '\\';
179                         *u++ = 'n';
180                         break;
181                     case '\r':
182                         *u++ = '\\';
183                         *u++ = 'r';
184                         break;
185                     case '\t':
186                         *u++ = '\\';
187                         *u++ = 't';
188                         break;
189                     case '\v':
190                         *u++ = '\\';
191                         *u++ = 'v';
192                         break;
193                     case '\\':
194                         *u++ = '\\';
195                         *u++ = '\\';
196                         break;
197                     default:
198 		        *u++ = s[offset+j];
199                 }
200             }
201             *u = '\0';
202             return buff->data;
203         }
204         case LGLSXP:{
205             int value;
206             value = LOGICAL(x)[indx];
207             if(value == TRUE) return "true";
208             if(value == FALSE) return "false";
209             return "\\N";
210         }
211         case INTSXP:{
212             int value;
213             value = INTEGER(x)[indx];
214             if(ISNA(value)) return "\\N";
215             snprintf(buff->data, buff->bufsize, "%d", value);
216             return buff->data;
217         }
218         case REALSXP:{
219             double value = REAL(x)[indx];
220             if (!R_FINITE(value)) {
221                 if(ISNA(value)) return "\\N";
222                 else if(ISNAN(value)) return "NaN";
223                 else if(value > 0) return "Inf";
224                 else return "-Inf";
225             }
226             snprintf(buff->data, buff->bufsize, "%.15g", value);
227             return buff->data;
228         }
229         default:
230             return buff->data;
231     }
232     return buff->data;
233 }
234 
235 static inline void
chkpqcopydataerr(PGconn * my_connection,int pqretcode)236 chkpqcopydataerr(PGconn *my_connection, int pqretcode)
237 {
238     if(pqretcode == -1){
239         char * pqerrmsg = PQerrorMessage(my_connection);
240         char * rserrmsg;
241         char * format = "PQputCopyData failed: %s";
242         size_t len = strlen(pqerrmsg) + strlen(format) + 1;
243         rserrmsg = R_alloc(len, 1);
244         snprintf(rserrmsg, len, format, pqerrmsg);
245         RS_DBI_errorMessage(rserrmsg, RS_DBI_ERROR);
246     }
247 }
248 
249 SEXP
RS_PostgreSQL_CopyInDataframe(Con_Handle * conHandle,SEXP x,SEXP nrow,SEXP ncol)250 RS_PostgreSQL_CopyInDataframe(Con_Handle * conHandle, SEXP x, SEXP nrow, SEXP ncol)
251 {
252     S_EVALUATOR RS_DBI_connection * con;
253     int nr, nc, i, j;
254     const char *cna ="\\N", *tmp=NULL /* -Wall */;
255     char cdec = '.';
256 
257     PGconn *my_connection;
258     int pqretcode;
259     nr = asInteger(nrow);
260     nc = asInteger(ncol);
261     const int buff_threshold = 8000;
262 
263     con = RS_DBI_getConnection(conHandle);
264     my_connection = (PGconn *) con->drvConnection;
265 
266     if(isVectorList(x)) { /* A data frame */
267         R_StringBuffer rstrbuf = {NULL, 0, 10000};
268 
269         char *strBuf  = Calloc(buff_threshold * 2 + 2, char); /* + 2 for '\t' or '\n' plus '\0'*/
270         char *strendp = strBuf;
271         SEXP *levels;
272         *strendp = '\0';
273 
274         R_AllocStringBuffer(10000, &rstrbuf);
275 	/* handle factors internally, check integrity */
276 	levels = (SEXP *) R_alloc(nc, sizeof(SEXP));
277 	for(j = 0; j < nc; j++) {
278             SEXP xj;
279 	    xj = VECTOR_ELT(x, j);
280 	    if(LENGTH(xj) != nr)
281 		error(("corrupt data frame -- length of column %d does not not match nrows"), j+1);
282 	    if(inherits(xj, "factor")) {
283 		levels[j] = getAttrib(xj, R_LevelsSymbol);
284 	    } else levels[j] = R_NilValue;
285 	}
286 
287 	for(i = 0; i < nr; i++) {
288 	    for(j = 0; j < nc; j++) {
289                 SEXP xj;
290 		xj = VECTOR_ELT(x, j);
291 		if(j > 0){
292                     *strendp++ =  '\t';/*need no size count check here*/
293                 }
294 		if(isna(xj, i)) tmp = cna;
295 		else {
296 		    if(!isNull(levels[j])) {
297 			/* We cannot assume factors have integer levels */
298 			if(TYPEOF(xj) == INTSXP){
299                             tmp = EncodeElementSconn(my_connection, levels[j], INTEGER(xj)[i] - 1,
300                                                  &rstrbuf, cdec);
301 			}else if(TYPEOF(xj) == REALSXP){
302                             tmp = EncodeElementSconn(my_connection, levels[j], REAL(xj)[i] - 1,
303                                                  &rstrbuf, cdec);
304 			}else
305 			    error("column %s claims to be a factor but does not have numeric codes", j+1);
306 		    } else {
307 			tmp = EncodeElementSconn(my_connection, xj, i,
308 					     &rstrbuf, cdec);
309 		    }
310 		}
311                 {
312                     size_t n;
313                     size_t len = strendp - strBuf;
314                     n = strlen(tmp);
315                     if (len + n < buff_threshold){
316                         memcpy(strendp, tmp, n);/* we already know the length */
317                         strendp += n;
318                     }else if(n < buff_threshold){ /*copy and flush*/
319                         memcpy(strendp, tmp, n);/* we already know the length */
320                         pqretcode = PQputCopyData(my_connection, strBuf, len + n);
321               	        chkpqcopydataerr(my_connection, pqretcode);
322                         strendp = strBuf;
323                     }else{ /*flush and copy current*/
324                         if(len > 0){
325                             pqretcode = PQputCopyData(my_connection, strBuf, len);
326                             chkpqcopydataerr(my_connection, pqretcode);
327                             strendp = strBuf;
328                         }
329                         pqretcode = PQputCopyData(my_connection, tmp, n);
330                         chkpqcopydataerr(my_connection, pqretcode);
331                     }
332                 }
333 	    }
334             *strendp = '\n'; strendp +=1; *strendp='\0';
335 	}
336         pqretcode = PQputCopyData(my_connection, strBuf, strendp - strBuf);
337         chkpqcopydataerr(my_connection, pqretcode);
338         Free(strBuf);
339         R_FreeStringBuffer(&rstrbuf);
340     }
341     PQputCopyEnd(my_connection, NULL);
342     return R_NilValue;
343 }
344 
345 
346