1 
2 /*
3  * Copyright (C) Yichun Zhang (agentzh)
4  */
5 
6 
7 #ifndef DDEBUG
8 #define DDEBUG 0
9 #endif
10 #include "ddebug.h"
11 
12 
13 #include "ngx_http_rds_csv_filter_module.h"
14 #include "ngx_http_rds_csv_output.h"
15 #include "ngx_http_rds_csv_util.h"
16 #include "resty_dbd_stream.h"
17 
18 
19 static u_char *ngx_http_rds_csv_request_mem(ngx_http_request_t *r,
20     ngx_http_rds_csv_ctx_t *ctx, size_t len);
21 static ngx_int_t ngx_http_rds_csv_get_buf(ngx_http_request_t *r,
22     ngx_http_rds_csv_ctx_t *ctx);
23 static u_char *ngx_http_rds_csv_get_postponed(ngx_http_request_t *r,
24     ngx_http_rds_csv_ctx_t *ctx, size_t len);
25 static ngx_int_t ngx_http_rds_csv_submit_mem(ngx_http_request_t *r,
26     ngx_http_rds_csv_ctx_t *ctx, size_t len, unsigned last_buf);
27 static size_t ngx_get_num_size(uint64_t i);
28 
29 
30 ngx_int_t
ngx_http_rds_csv_output_literal(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,u_char * data,size_t len,int last_buf)31 ngx_http_rds_csv_output_literal(ngx_http_request_t *r,
32     ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len,
33     int last_buf)
34 {
35     u_char                      *pos;
36 
37     pos = ngx_http_rds_csv_request_mem(r, ctx, len);
38     if (pos == NULL) {
39         return NGX_ERROR;
40     }
41 
42     ngx_memcpy(pos, data, len);
43 
44     dd("before output chain");
45 
46     if (last_buf) {
47         ctx->seen_stream_end = 1;
48 
49         if (r != r->main) {
50             last_buf = 0;
51         }
52     }
53 
54     return ngx_http_rds_csv_submit_mem(r, ctx, len, (unsigned) last_buf);
55 }
56 
57 
58 ngx_int_t
ngx_http_rds_csv_output_bufs(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx)59 ngx_http_rds_csv_output_bufs(ngx_http_request_t *r,
60     ngx_http_rds_csv_ctx_t *ctx)
61 {
62     ngx_int_t                rc;
63     ngx_chain_t             *cl;
64 
65     dd("entered output chain");
66 
67     if (ctx->seen_stream_end) {
68         ctx->seen_stream_end = 0;
69 
70         if (ctx->avail_out) {
71             cl = ngx_alloc_chain_link(r->pool);
72             if (cl == NULL) {
73                 return NGX_ERROR;
74             }
75 
76             cl->buf = ctx->out_buf;
77             cl->next = NULL;
78             *ctx->last_out = cl;
79             ctx->last_out = &cl->next;
80 
81             ctx->avail_out = 0;
82         }
83     }
84 
85     dd_dump_chain_size();
86 
87     for ( ;; ) {
88         if (ctx->out == NULL) {
89             /* fprintf(stderr, "\n"); */
90             return NGX_OK;
91         }
92 
93         /* fprintf(stderr, "XXX Relooping..."); */
94 
95         rc = ngx_http_rds_csv_next_body_filter(r, ctx->out);
96 
97         if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
98             return rc;
99         }
100 
101 #if defined(nginx_version) && nginx_version >= 1001004
102         ngx_chain_update_chains(r->pool, &ctx->free_bufs, &ctx->busy_bufs,
103                                 &ctx->out, ctx->tag);
104 #else
105         ngx_chain_update_chains(&ctx->free_bufs, &ctx->busy_bufs, &ctx->out,
106                                 ctx->tag);
107 #endif
108 
109         ctx->last_out = &ctx->out;
110     }
111 
112     /* impossible to reach here */
113     return NGX_ERROR;
114 }
115 
116 
117 ngx_int_t
ngx_http_rds_csv_output_header(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,ngx_http_rds_header_t * header)118 ngx_http_rds_csv_output_header(ngx_http_request_t *r,
119     ngx_http_rds_csv_ctx_t *ctx, ngx_http_rds_header_t *header)
120 {
121     u_char                  *pos, *last;
122     size_t                   size;
123     uintptr_t                escape;
124     unsigned                 last_buf = 0;
125     unsigned                 need_quotes = 0;
126     u_char                   sep;
127 
128     ngx_http_rds_csv_loc_conf_t       *conf;
129 
130     /* calculate the buffer size */
131 
132     conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
133 
134     if (conf->field_name_header) {
135         size = sizeof("errcode,errstr,insert_id,affected_rows") - 1
136                + conf->row_term.len;
137 
138     } else {
139         size = 0;
140     }
141 
142     sep = (u_char) conf->field_sep;
143 
144     size += 3 /* field seperators */ + conf->row_term.len;
145 
146     size += ngx_get_num_size(header->std_errcode);
147 
148     escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, header->errstr.data,
149                                              header->errstr.len,
150                                              &need_quotes);
151 
152     if (need_quotes) {
153         size += sizeof("\"\"") - 1;
154     }
155 
156     size += header->errstr.len + escape
157             + ngx_get_num_size(header->insert_id)
158             + ngx_get_num_size(header->affected_rows);
159 
160     /* create the buffer */
161 
162     pos = ngx_http_rds_csv_request_mem(r, ctx, size);
163     if (pos == NULL) {
164         return NGX_ERROR;
165     }
166 
167     last = pos;
168 
169     /* fill up the buffer */
170 
171     last = ngx_sprintf(last, "errcode%cerrstr%cinsert_id%caffected_rows%V"
172                        "%uD%c", sep, sep, sep, &conf->row_term,
173                        (uint32_t) header->std_errcode, sep);
174 
175     if (need_quotes) {
176         *last++ = '"';
177     }
178 
179     if (escape == 0) {
180         last = ngx_copy(last, header->errstr.data, header->errstr.len);
181 
182     } else {
183         last = (u_char *)
184                 ngx_http_rds_csv_escape_csv_str(sep, last,
185                                                 header->errstr.data,
186                                                 header->errstr.len, NULL);
187     }
188 
189     if (need_quotes) {
190         *last++ = '"';
191     }
192 
193     last = ngx_sprintf(last, "%c%uL%c%uL%V", sep, header->insert_id, sep,
194                        header->affected_rows, &conf->row_term);
195 
196     if ((size_t) (last - pos) != size) {
197         ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
198                       "rds_csv: output header buffer error: %uz != %uz",
199                       (size_t) (last - pos), size);
200 
201         return NGX_ERROR;
202     }
203 
204     if (r == r->main) {
205         last_buf = 1;
206     }
207 
208     ctx->seen_stream_end = 1;
209 
210     return ngx_http_rds_csv_submit_mem(r, ctx, size, last_buf);
211 }
212 
213 
214 ngx_int_t
ngx_http_rds_csv_output_field_names(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx)215 ngx_http_rds_csv_output_field_names(ngx_http_request_t *r,
216     ngx_http_rds_csv_ctx_t *ctx)
217 {
218     ngx_uint_t                           i;
219     ngx_http_rds_column_t               *col;
220     size_t                               size;
221     u_char                              *pos, *last;
222     uintptr_t                            escape = 0;
223     unsigned                             need_quotes;
224     u_char                               sep;
225     ngx_http_rds_csv_loc_conf_t         *conf;
226 
227     conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
228 
229     sep = (u_char) conf->field_sep;
230 
231     size = ctx->col_count - 1 /* field sep count */
232          + conf->row_term.len;
233 
234     for (i = 0; i < ctx->col_count; i++) {
235         col = &ctx->cols[i];
236         escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, col->name.data,
237                                                  col->name.len, &need_quotes);
238 
239         dd("field escape: %d", (int) escape);
240 
241         if (need_quotes) {
242             size += sizeof("\"\"") - 1;
243         }
244 
245         size += col->name.len + escape;
246     }
247 
248     ctx->generated_col_names = 1;
249 
250     pos = ngx_http_rds_csv_request_mem(r, ctx, size);
251     if (pos == NULL) {
252         return NGX_ERROR;
253     }
254 
255     last = pos;
256 
257     for (i = 0; i < ctx->col_count; i++) {
258         col = &ctx->cols[i];
259 
260         escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, col->name.data,
261                                                  col->name.len, &need_quotes);
262 
263         if (need_quotes) {
264             *last++ = '"';
265         }
266 
267         if (escape == 0) {
268             last = ngx_copy(last, col->name.data, col->name.len);
269 
270         } else {
271             last = (u_char *)
272                    ngx_http_rds_csv_escape_csv_str(sep, last,
273                                                    col->name.data,
274                                                    col->name.len, NULL);
275         }
276 
277         if (need_quotes) {
278             *last++ = '"';
279         }
280 
281         if (i != ctx->col_count - 1) {
282             *last++ = sep;
283         }
284     }
285 
286     last = ngx_copy(last, conf->row_term.data, conf->row_term.len);
287 
288     if ((size_t) (last - pos) != size) {
289         ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
290                       "rds_csv: output field names buffer error: %uz != %uz",
291                       (size_t) (last - pos), size);
292 
293         return NGX_ERROR;
294     }
295 
296     return ngx_http_rds_csv_submit_mem(r, ctx, size, 0);
297 }
298 
299 
300 ngx_int_t
ngx_http_rds_csv_output_field(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,u_char * data,size_t len,int is_null)301 ngx_http_rds_csv_output_field(ngx_http_request_t *r,
302     ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len, int is_null)
303 {
304     u_char                              *pos, *last;
305     ngx_http_rds_column_t               *col;
306     size_t                               size;
307     uintptr_t                            val_escape = 0;
308     unsigned                             need_quotes = 0;
309     u_char                               sep;
310     ngx_http_rds_csv_loc_conf_t         *conf;
311 #if DDEBUG
312     u_char                              *p;
313 #endif
314 
315     conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
316 
317     sep = (u_char) conf->field_sep;
318 
319     dd("reading row %llu, col %d, len %d",
320        (unsigned long long) ctx->row,
321        (int) ctx->cur_col, (int) len);
322 
323     /* calculate the buffer size */
324 
325     if (ctx->cur_col == 0) {
326         size = 0;
327 
328     } else {
329         size = 1 /* field sep */;
330     }
331 
332     col = &ctx->cols[ctx->cur_col];
333 
334     if (len == 0 && ctx->field_data_rest > 0) {
335         ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
336                       "rds_csv: at least one octet should go with the field "
337                       "size in one buf");
338 
339         return NGX_ERROR;
340     }
341 
342     if (is_null) {
343         /* SQL NULL is just empty in the CSV field */
344 
345     } else if (len == 0) {
346         /* empty string is also empty */
347 
348     } else {
349         switch (col->std_type & 0xc000) {
350         case rds_rough_col_type_float:
351         case rds_rough_col_type_int:
352         case rds_rough_col_type_bool:
353             size += len;
354             break;
355 
356         default:
357             dd("string field found");
358 
359             val_escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, data, len,
360                                                          &need_quotes);
361 
362             if (ctx->field_data_rest > 0 && !need_quotes) {
363                 need_quotes = 1;
364             }
365 
366             if (need_quotes) {
367                 if (ctx->field_data_rest == 0) {
368                     size += sizeof("\"\"") - 1;
369 
370                 } else {
371                     size += sizeof("\"") - 1;
372                 }
373             }
374 
375             size += len + val_escape;
376             break;
377         }
378     }
379 
380     if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) {
381         /* last column in the row */
382         size += conf->row_term.len;
383     }
384 
385     /* allocate the buffer */
386 
387     pos = ngx_http_rds_csv_request_mem(r, ctx, size);
388     if (pos == NULL) {
389         return NGX_ERROR;
390     }
391 
392     last = pos;
393 
394     /* fill up the buffer */
395 
396     if (ctx->cur_col != 0) {
397         *last++ = sep;
398     }
399 
400     if (is_null || len == 0) {
401         /* do nothing */
402 
403     } else {
404         switch (col->std_type & 0xc000) {
405         case rds_rough_col_type_int:
406         case rds_rough_col_type_float:
407         case rds_rough_col_type_bool:
408             last = ngx_copy(last, data, len);
409             break;
410 
411         default:
412             /* string */
413             if (need_quotes) {
414                 *last++ = '"';
415             }
416 
417             if (val_escape == 0) {
418                 last = ngx_copy(last, data, len);
419 
420             } else {
421                 dd("field: string value escape non-zero: %d",
422                    (int) val_escape);
423 
424 #if DDEBUG
425                 p = last;
426 #endif
427 
428                 last = (u_char *)
429                        ngx_http_rds_csv_escape_csv_str(sep, last, data, len,
430                                                        NULL);
431 
432 #if DDEBUG
433                 dd("escaped value \"%.*s\" (len %d, escape %d, escape2 %d)",
434                    (int) (len + val_escape),
435                    p, (int) (len + val_escape),
436                    (int) val_escape,
437                    (int) ((last - p) - len));
438 #endif
439             }
440 
441             if (need_quotes && ctx->field_data_rest == 0) {
442                 *last++ = '"';
443             }
444 
445             break;
446         }
447     }
448 
449     if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) {
450         last = ngx_copy(last, conf->row_term.data, conf->row_term.len);
451     }
452 
453     if ((size_t) (last - pos) != size) {
454         ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
455                       "rds_csv: output field: buffer error (%d left)",
456                       (int) size - (last - pos));
457 
458         return NGX_ERROR;
459     }
460 
461     return ngx_http_rds_csv_submit_mem(r, ctx, size, 0);
462 }
463 
464 
465 ngx_int_t
ngx_http_rds_csv_output_more_field_data(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,u_char * data,size_t len)466 ngx_http_rds_csv_output_more_field_data(ngx_http_request_t *r,
467     ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len)
468 {
469     u_char                          *pos, *last;
470     size_t                           size = 0;
471     ngx_http_rds_column_t           *col;
472     uintptr_t                        escape = 0;
473 #if DDEBUG
474     u_char                          *p;
475 #endif
476     unsigned                         need_quotes;
477     u_char                           sep;
478     ngx_http_rds_csv_loc_conf_t     *conf;
479 
480     conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
481 
482     sep = (u_char) conf->field_sep;
483 
484     /* calculate the buffer size */
485 
486     col = &ctx->cols[ctx->cur_col];
487 
488     switch (col->std_type & 0xc000) {
489     case rds_rough_col_type_int:
490     case rds_rough_col_type_float:
491     case rds_rough_col_type_bool:
492         size += len;
493         break;
494 
495     default:
496         /* string */
497 
498         escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, data, len,
499                                                  &need_quotes);
500 
501         size = len + escape;
502 
503         if (ctx->field_data_rest == 0) {
504             size += sizeof("\"") - 1;
505         }
506 
507         break;
508     }
509 
510     if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) {
511         /* last column in the row */
512         size += conf->row_term.len;
513     }
514 
515     /* allocate the buffer */
516 
517     pos = ngx_http_rds_csv_request_mem(r, ctx, size);
518     if (pos == NULL) {
519         return NGX_ERROR;
520     }
521 
522     last = pos;
523 
524     /* fill up the buffer */
525 
526     switch (col->std_type & 0xc000) {
527     case rds_rough_col_type_int:
528     case rds_rough_col_type_float:
529     case rds_rough_col_type_bool:
530         last = ngx_copy(last, data, len);
531         break;
532 
533     default:
534         /* string */
535         if (escape == 0) {
536             last = ngx_copy(last, data, len);
537 
538         } else {
539             dd("more field data: string value escape non-zero: %d",
540                (int) escape);
541 
542 #if DDEBUG
543             p = last;
544 #endif
545 
546             last = (u_char *) ngx_http_rds_csv_escape_csv_str(sep, last, data,
547                                                               len, NULL);
548 
549 #if DDEBUG
550             dd("escaped value \"%.*s\" (len %d, escape %d, escape2 %d)",
551                (int) (len + escape),
552                p, (int) (len + escape),
553                (int) escape,
554                (int) ((last - p) - len));
555 #endif
556         }
557 
558         if (ctx->field_data_rest == 0) {
559             *last++ = '"';
560         }
561 
562         break;
563     } /* switch */
564 
565     if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) {
566         /* last column in the row */
567         last = ngx_copy(last, conf->row_term.data, conf->row_term.len);
568     }
569 
570     if ((size_t) (last - pos) != size) {
571         ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
572                       "rds_csv: output more field data: buffer error "
573                       "(%d left)", (int) (size - (last - pos)));
574         return NGX_ERROR;
575     }
576 
577     return ngx_http_rds_csv_submit_mem(r, ctx, size, 0);
578 }
579 
580 
581 static u_char *
ngx_http_rds_csv_request_mem(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,size_t len)582 ngx_http_rds_csv_request_mem(ngx_http_request_t *r,
583     ngx_http_rds_csv_ctx_t *ctx, size_t len)
584 {
585     ngx_int_t                rc;
586     u_char                  *p;
587 
588     rc = ngx_http_rds_csv_get_buf(r, ctx);
589     if (rc != NGX_OK) {
590         return NULL;
591     }
592 
593     if (ctx->avail_out < len) {
594         p = ngx_http_rds_csv_get_postponed(r, ctx, len);
595         if (p == NULL) {
596             return NULL;
597         }
598 
599         ctx->postponed.pos = p;
600         ctx->postponed.last = p + len;
601 
602         return p;
603     }
604 
605     return ctx->out_buf->last;
606 }
607 
608 
609 static ngx_int_t
ngx_http_rds_csv_get_buf(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx)610 ngx_http_rds_csv_get_buf(ngx_http_request_t *r, ngx_http_rds_csv_ctx_t *ctx)
611 {
612     ngx_http_rds_csv_loc_conf_t         *conf;
613 
614     dd("MEM enter");
615 
616     if (ctx->avail_out) {
617         return NGX_OK;
618     }
619 
620     conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
621 
622     if (ctx->free_bufs) {
623         dd("MEM reusing temp buf from free_bufs");
624 
625         ctx->out_buf = ctx->free_bufs->buf;
626         ctx->free_bufs = ctx->free_bufs->next;
627 
628     } else {
629         dd("MEM creating temp buf with size: %d", (int) conf->buf_size);
630         ctx->out_buf = ngx_create_temp_buf(r->pool, conf->buf_size);
631         if (ctx->out_buf == NULL) {
632             return NGX_ERROR;
633         }
634 
635         ctx->out_buf->tag = (ngx_buf_tag_t) &ngx_http_rds_csv_filter_module;
636         ctx->out_buf->recycled = 1;
637     }
638 
639     ctx->avail_out = conf->buf_size;
640 
641     return NGX_OK;
642 }
643 
644 
645 static u_char *
ngx_http_rds_csv_get_postponed(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,size_t len)646 ngx_http_rds_csv_get_postponed(ngx_http_request_t *r,
647     ngx_http_rds_csv_ctx_t *ctx, size_t len)
648 {
649     u_char          *p;
650 
651     dd("MEM enter");
652 
653     if (ctx->cached.start == NULL) {
654         goto alloc;
655     }
656 
657     if ((size_t) (ctx->cached.end - ctx->cached.start) < len) {
658         ngx_pfree(r->pool, ctx->cached.start);
659         goto alloc;
660     }
661 
662     return ctx->cached.start;
663 
664 alloc:
665 
666     p = ngx_palloc(r->pool, len);
667     if (p == NULL) {
668         return NULL;
669     }
670 
671     ctx->cached.start = p;
672     ctx->cached.end = p + len;
673 
674     return p;
675 }
676 
677 
678 static ngx_int_t
ngx_http_rds_csv_submit_mem(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,size_t len,unsigned last_buf)679 ngx_http_rds_csv_submit_mem(ngx_http_request_t *r,
680     ngx_http_rds_csv_ctx_t *ctx, size_t len, unsigned last_buf)
681 {
682     ngx_chain_t             *cl;
683     ngx_int_t                rc;
684 
685     if (ctx->postponed.pos != NULL) {
686         dd("MEM copy postponed data over to ctx->out for len %d", (int) len);
687 
688         for ( ;; ) {
689             len = ctx->postponed.last - ctx->postponed.pos;
690             if (len > ctx->avail_out) {
691                 len = ctx->avail_out;
692             }
693 
694             ctx->out_buf->last = ngx_copy(ctx->out_buf->last,
695                                           ctx->postponed.pos, len);
696 
697             ctx->avail_out -= len;
698 
699             ctx->postponed.pos += len;
700 
701             if (ctx->postponed.pos == ctx->postponed.last) {
702                 ctx->postponed.pos = NULL;
703             }
704 
705             if (ctx->avail_out > 0) {
706                 break;
707             }
708 
709             dd("MEM save ctx->out_buf");
710 
711             cl = ngx_alloc_chain_link(r->pool);
712             if (cl == NULL) {
713                 return NGX_ERROR;
714             }
715 
716             cl->buf = ctx->out_buf;
717             cl->next = NULL;
718             *ctx->last_out = cl;
719             ctx->last_out = &cl->next;
720 
721             if (ctx->postponed.pos == NULL) {
722                 ctx->out_buf->last_buf = last_buf;
723                 break;
724             }
725 
726             rc = ngx_http_rds_csv_get_buf(r, ctx);
727             if (rc != NGX_OK) {
728                 return NGX_ERROR;
729             }
730         }
731 
732         return NGX_OK;
733     }
734 
735     dd("MEM consuming out_buf for %d", (int) len);
736 
737     ctx->out_buf->last += len;
738     ctx->avail_out -= len;
739     ctx->out_buf->last_buf = last_buf;
740 
741     if (ctx->avail_out == 0) {
742         dd("MEM save ctx->out_buf");
743 
744         cl = ngx_alloc_chain_link(r->pool);
745         if (cl == NULL) {
746             return NGX_ERROR;
747         }
748 
749         cl->buf = ctx->out_buf;
750         cl->next = NULL;
751         *ctx->last_out = cl;
752         ctx->last_out = &cl->next;
753     }
754 
755     return NGX_OK;
756 }
757 
758 
759 static size_t
ngx_get_num_size(uint64_t i)760 ngx_get_num_size(uint64_t i)
761 {
762     size_t          n = 0;
763 
764     do {
765         i = i / 10;
766         n++;
767     } while (i > 0);
768 
769     return n;
770 }
771