1 /*
2 * RS-pgsql-copy.c
3 *
4 * $Id$
5 */
6
7 #include "RS-PostgreSQL.h"
8 #include <Rinternals.h>
9 #include <R.h> /* for Calloc/Free */
10 #include <ctype.h>
11 #include <stdlib.h>
12 #include <errno.h>
13 #define COPY_IN_BUFSIZE 8192
14
15 typedef struct {
16 char *data;
17 size_t bufsize;
18 size_t defaultSize;
19 } R_StringBuffer;
20
21 /* adapter for PQputCopyData and PQputCopyEnd
22 which is used in conjunction with COPY table from STDIN */
23
24 /*
25 * Copies all content of the file specified with filename to the conHandle which
26 * has opened connection previously issued the COPY table from STDIN query
27 * the data is read from file sent to the database with PQputCopyData
28 * in chunks of COPY_IN_BUFSIZE.
29 * The copy ends when 0 byte could be read from the file and then the
30 * PQputCopyEnd is called to complete the copying.
31 */
32
33
34 static inline void chkpqcopydataerr(PGconn *, int);
35 s_object *
RS_PostgreSQL_CopyIn(Con_Handle * conHandle,s_object * filename)36 RS_PostgreSQL_CopyIn(Con_Handle * conHandle, s_object * filename)
37 {
38 S_EVALUATOR RS_DBI_connection * con;
39 PGconn *my_connection;
40
41 char *dyn_filename;
42 char copybuf[COPY_IN_BUFSIZE];
43 FILE* filehandle;
44 size_t len;
45 int pqretcode;
46
47 con = RS_DBI_getConnection(conHandle);
48 my_connection = (PGconn *) con->drvConnection;
49 dyn_filename = RS_DBI_copyString(CHR_EL(filename, 0));
50
51 filehandle=fopen(dyn_filename, "r");
52 if(filehandle == NULL){
53 char errmsg[1024];
54 snprintf(errmsg, 1024, "could not open file: %s", dyn_filename);
55 RS_DBI_errorMessage(dyn_filename, RS_DBI_ERROR);
56 return S_NULL_ENTRY;
57 }
58
59 while((len = fread(copybuf,1,COPY_IN_BUFSIZE, filehandle))){
60 pqretcode = PQputCopyData(my_connection, copybuf, len);
61 chkpqcopydataerr(my_connection, pqretcode);
62
63 }
64 PQputCopyEnd(my_connection, NULL);
65 fclose(filehandle);
66
67 free(dyn_filename);
68 return S_NULL_ENTRY;
69 }
70
71
R_AllocStringBuffer(size_t blen,R_StringBuffer * buf)72 void *R_AllocStringBuffer(size_t blen, R_StringBuffer *buf)
73 {
74 size_t blen1, bsize = buf->defaultSize;
75
76 if(blen < buf->bufsize) return buf->data;
77 blen1 = blen = (blen + 1);
78 blen = (blen / bsize) * bsize;
79 if(blen < blen1) blen += bsize;
80
81 if(buf->data == NULL) {
82 buf->data = (char *) malloc(blen);
83 buf->data[0] = '\0';
84 } else
85 buf->data = (char *) realloc(buf->data, blen);
86 buf->bufsize = blen;
87 if(!buf->data) {
88 buf->bufsize = 0;
89 /* don't translate internal error message */
90 error("could not allocate memory (%u Mb) in C function 'R_AllocStringBuffer'",
91 (unsigned int) blen/1024/1024);
92 }
93 return buf->data;
94 }
95
96
97 void
R_FreeStringBuffer(R_StringBuffer * buf)98 R_FreeStringBuffer(R_StringBuffer *buf)
99 {
100 if (buf->data != NULL) {
101 free(buf->data);
102 buf->bufsize = 0;
103 buf->data = NULL;
104 }
105 }
106
107
isna(SEXP x,int indx)108 static Rboolean isna(SEXP x, int indx)
109 {
110 Rcomplex rc;
111 switch(TYPEOF(x)) {
112 case LGLSXP:
113 return LOGICAL(x)[indx] == NA_LOGICAL;
114 break;
115 case INTSXP:
116 return INTEGER(x)[indx] == NA_INTEGER;
117 break;
118 case REALSXP:
119 return ISNAN(REAL(x)[indx]);
120 break;
121 case STRSXP:
122 return STRING_ELT(x, indx) == NA_STRING;
123 break;
124 case CPLXSXP:
125 rc = COMPLEX(x)[indx];
126 return ISNAN(rc.r) || ISNAN(rc.i);
127 break;
128 default:
129 break;
130 }
131 return FALSE;
132 }
133
134 /* a version of EncodeElement with different escaping of char strings */
135 static const char
EncodeElementSconn(PGconn * my_connection,SEXP x,int indx,R_StringBuffer * buff,char cdec)136 *EncodeElementSconn(PGconn* my_connection, SEXP x, int indx,
137 R_StringBuffer *buff, char cdec)
138 {
139 buff->data[0]='\0';
140 switch(TYPEOF(x)) {
141 case RAWSXP:{
142 const unsigned char*rawdata;
143 unsigned char* escapedstring;
144 int len;
145 size_t escaped_length;
146 len = LENGTH(x);
147 rawdata=RAW(x);
148 escapedstring = PQescapeByteaConn(my_connection, rawdata, len, &escaped_length);
149 memcpy(buff->data, escapedstring, escaped_length);
150 buff->data[escaped_length]='\0';
151 free(escapedstring);
152 return buff->data;
153
154
155 }
156 case STRSXP:
157 {
158 const char *s = translateCharUTF8(STRING_ELT(x, indx));
159 char *u, *cbuf;
160 int j, len, blen, offset;
161 len = strlen(s);
162 blen = len * 2 + 1;
163 R_AllocStringBuffer(blen, buff);
164 u = cbuf = buff->data;
165 offset = 0;
166 for (j = 0; j < len; j++){
167 switch(s[offset+j]){
168 /* http://www.postgresql.org/docs/8.1/static/sql-copy.html */
169 case '\b':
170 *u++ = '\\';
171 *u++ = 'b';
172 break;
173 case '\f':
174 *u++ = '\\';
175 *u++ = 'f';
176 break;
177 case '\n':
178 *u++ = '\\';
179 *u++ = 'n';
180 break;
181 case '\r':
182 *u++ = '\\';
183 *u++ = 'r';
184 break;
185 case '\t':
186 *u++ = '\\';
187 *u++ = 't';
188 break;
189 case '\v':
190 *u++ = '\\';
191 *u++ = 'v';
192 break;
193 case '\\':
194 *u++ = '\\';
195 *u++ = '\\';
196 break;
197 default:
198 *u++ = s[offset+j];
199 }
200 }
201 *u = '\0';
202 return buff->data;
203 }
204 case LGLSXP:{
205 int value;
206 value = LOGICAL(x)[indx];
207 if(value == TRUE) return "true";
208 if(value == FALSE) return "false";
209 return "\\N";
210 }
211 case INTSXP:{
212 int value;
213 value = INTEGER(x)[indx];
214 if(ISNA(value)) return "\\N";
215 snprintf(buff->data, buff->bufsize, "%d", value);
216 return buff->data;
217 }
218 case REALSXP:{
219 double value = REAL(x)[indx];
220 if (!R_FINITE(value)) {
221 if(ISNA(value)) return "\\N";
222 else if(ISNAN(value)) return "NaN";
223 else if(value > 0) return "Inf";
224 else return "-Inf";
225 }
226 snprintf(buff->data, buff->bufsize, "%.15g", value);
227 return buff->data;
228 }
229 default:
230 return buff->data;
231 }
232 return buff->data;
233 }
234
235 static inline void
chkpqcopydataerr(PGconn * my_connection,int pqretcode)236 chkpqcopydataerr(PGconn *my_connection, int pqretcode)
237 {
238 if(pqretcode == -1){
239 char * pqerrmsg = PQerrorMessage(my_connection);
240 char * rserrmsg;
241 char * format = "PQputCopyData failed: %s";
242 size_t len = strlen(pqerrmsg) + strlen(format) + 1;
243 rserrmsg = R_alloc(len, 1);
244 snprintf(rserrmsg, len, format, pqerrmsg);
245 RS_DBI_errorMessage(rserrmsg, RS_DBI_ERROR);
246 }
247 }
248
249 SEXP
RS_PostgreSQL_CopyInDataframe(Con_Handle * conHandle,SEXP x,SEXP nrow,SEXP ncol)250 RS_PostgreSQL_CopyInDataframe(Con_Handle * conHandle, SEXP x, SEXP nrow, SEXP ncol)
251 {
252 S_EVALUATOR RS_DBI_connection * con;
253 int nr, nc, i, j;
254 const char *cna ="\\N", *tmp=NULL /* -Wall */;
255 char cdec = '.';
256
257 PGconn *my_connection;
258 int pqretcode;
259 nr = asInteger(nrow);
260 nc = asInteger(ncol);
261 const int buff_threshold = 8000;
262
263 con = RS_DBI_getConnection(conHandle);
264 my_connection = (PGconn *) con->drvConnection;
265
266 if(isVectorList(x)) { /* A data frame */
267 R_StringBuffer rstrbuf = {NULL, 0, 10000};
268
269 char *strBuf = Calloc(buff_threshold * 2 + 2, char); /* + 2 for '\t' or '\n' plus '\0'*/
270 char *strendp = strBuf;
271 SEXP *levels;
272 *strendp = '\0';
273
274 R_AllocStringBuffer(10000, &rstrbuf);
275 /* handle factors internally, check integrity */
276 levels = (SEXP *) R_alloc(nc, sizeof(SEXP));
277 for(j = 0; j < nc; j++) {
278 SEXP xj;
279 xj = VECTOR_ELT(x, j);
280 if(LENGTH(xj) != nr)
281 error(("corrupt data frame -- length of column %d does not not match nrows"), j+1);
282 if(inherits(xj, "factor")) {
283 levels[j] = getAttrib(xj, R_LevelsSymbol);
284 } else levels[j] = R_NilValue;
285 }
286
287 for(i = 0; i < nr; i++) {
288 for(j = 0; j < nc; j++) {
289 SEXP xj;
290 xj = VECTOR_ELT(x, j);
291 if(j > 0){
292 *strendp++ = '\t';/*need no size count check here*/
293 }
294 if(isna(xj, i)) tmp = cna;
295 else {
296 if(!isNull(levels[j])) {
297 /* We cannot assume factors have integer levels */
298 if(TYPEOF(xj) == INTSXP){
299 tmp = EncodeElementSconn(my_connection, levels[j], INTEGER(xj)[i] - 1,
300 &rstrbuf, cdec);
301 }else if(TYPEOF(xj) == REALSXP){
302 tmp = EncodeElementSconn(my_connection, levels[j], REAL(xj)[i] - 1,
303 &rstrbuf, cdec);
304 }else
305 error("column %s claims to be a factor but does not have numeric codes", j+1);
306 } else {
307 tmp = EncodeElementSconn(my_connection, xj, i,
308 &rstrbuf, cdec);
309 }
310 }
311 {
312 size_t n;
313 size_t len = strendp - strBuf;
314 n = strlen(tmp);
315 if (len + n < buff_threshold){
316 memcpy(strendp, tmp, n);/* we already know the length */
317 strendp += n;
318 }else if(n < buff_threshold){ /*copy and flush*/
319 memcpy(strendp, tmp, n);/* we already know the length */
320 pqretcode = PQputCopyData(my_connection, strBuf, len + n);
321 chkpqcopydataerr(my_connection, pqretcode);
322 strendp = strBuf;
323 }else{ /*flush and copy current*/
324 if(len > 0){
325 pqretcode = PQputCopyData(my_connection, strBuf, len);
326 chkpqcopydataerr(my_connection, pqretcode);
327 strendp = strBuf;
328 }
329 pqretcode = PQputCopyData(my_connection, tmp, n);
330 chkpqcopydataerr(my_connection, pqretcode);
331 }
332 }
333 }
334 *strendp = '\n'; strendp +=1; *strendp='\0';
335 }
336 pqretcode = PQputCopyData(my_connection, strBuf, strendp - strBuf);
337 chkpqcopydataerr(my_connection, pqretcode);
338 Free(strBuf);
339 R_FreeStringBuffer(&rstrbuf);
340 }
341 PQputCopyEnd(my_connection, NULL);
342 return R_NilValue;
343 }
344
345
346