1
2 /*
3 * Copyright (C) Yichun Zhang (agentzh)
4 */
5
6
7 #ifndef DDEBUG
8 #define DDEBUG 0
9 #endif
10 #include "ddebug.h"
11
12
13 #include "ngx_http_rds_csv_filter_module.h"
14 #include "ngx_http_rds_csv_output.h"
15 #include "ngx_http_rds_csv_util.h"
16 #include "resty_dbd_stream.h"
17
18
19 static u_char *ngx_http_rds_csv_request_mem(ngx_http_request_t *r,
20 ngx_http_rds_csv_ctx_t *ctx, size_t len);
21 static ngx_int_t ngx_http_rds_csv_get_buf(ngx_http_request_t *r,
22 ngx_http_rds_csv_ctx_t *ctx);
23 static u_char *ngx_http_rds_csv_get_postponed(ngx_http_request_t *r,
24 ngx_http_rds_csv_ctx_t *ctx, size_t len);
25 static ngx_int_t ngx_http_rds_csv_submit_mem(ngx_http_request_t *r,
26 ngx_http_rds_csv_ctx_t *ctx, size_t len, unsigned last_buf);
27 static size_t ngx_get_num_size(uint64_t i);
28
29
30 ngx_int_t
ngx_http_rds_csv_output_literal(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,u_char * data,size_t len,int last_buf)31 ngx_http_rds_csv_output_literal(ngx_http_request_t *r,
32 ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len,
33 int last_buf)
34 {
35 u_char *pos;
36
37 pos = ngx_http_rds_csv_request_mem(r, ctx, len);
38 if (pos == NULL) {
39 return NGX_ERROR;
40 }
41
42 ngx_memcpy(pos, data, len);
43
44 dd("before output chain");
45
46 if (last_buf) {
47 ctx->seen_stream_end = 1;
48
49 if (r != r->main) {
50 last_buf = 0;
51 }
52 }
53
54 return ngx_http_rds_csv_submit_mem(r, ctx, len, (unsigned) last_buf);
55 }
56
57
58 ngx_int_t
ngx_http_rds_csv_output_bufs(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx)59 ngx_http_rds_csv_output_bufs(ngx_http_request_t *r,
60 ngx_http_rds_csv_ctx_t *ctx)
61 {
62 ngx_int_t rc;
63 ngx_chain_t *cl;
64
65 dd("entered output chain");
66
67 if (ctx->seen_stream_end) {
68 ctx->seen_stream_end = 0;
69
70 if (ctx->avail_out) {
71 cl = ngx_alloc_chain_link(r->pool);
72 if (cl == NULL) {
73 return NGX_ERROR;
74 }
75
76 cl->buf = ctx->out_buf;
77 cl->next = NULL;
78 *ctx->last_out = cl;
79 ctx->last_out = &cl->next;
80
81 ctx->avail_out = 0;
82 }
83 }
84
85 dd_dump_chain_size();
86
87 for ( ;; ) {
88 if (ctx->out == NULL) {
89 /* fprintf(stderr, "\n"); */
90 return NGX_OK;
91 }
92
93 /* fprintf(stderr, "XXX Relooping..."); */
94
95 rc = ngx_http_rds_csv_next_body_filter(r, ctx->out);
96
97 if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
98 return rc;
99 }
100
101 #if defined(nginx_version) && nginx_version >= 1001004
102 ngx_chain_update_chains(r->pool, &ctx->free_bufs, &ctx->busy_bufs,
103 &ctx->out, ctx->tag);
104 #else
105 ngx_chain_update_chains(&ctx->free_bufs, &ctx->busy_bufs, &ctx->out,
106 ctx->tag);
107 #endif
108
109 ctx->last_out = &ctx->out;
110 }
111
112 /* impossible to reach here */
113 return NGX_ERROR;
114 }
115
116
117 ngx_int_t
ngx_http_rds_csv_output_header(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,ngx_http_rds_header_t * header)118 ngx_http_rds_csv_output_header(ngx_http_request_t *r,
119 ngx_http_rds_csv_ctx_t *ctx, ngx_http_rds_header_t *header)
120 {
121 u_char *pos, *last;
122 size_t size;
123 uintptr_t escape;
124 unsigned last_buf = 0;
125 unsigned need_quotes = 0;
126 u_char sep;
127
128 ngx_http_rds_csv_loc_conf_t *conf;
129
130 /* calculate the buffer size */
131
132 conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
133
134 if (conf->field_name_header) {
135 size = sizeof("errcode,errstr,insert_id,affected_rows") - 1
136 + conf->row_term.len;
137
138 } else {
139 size = 0;
140 }
141
142 sep = (u_char) conf->field_sep;
143
144 size += 3 /* field seperators */ + conf->row_term.len;
145
146 size += ngx_get_num_size(header->std_errcode);
147
148 escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, header->errstr.data,
149 header->errstr.len,
150 &need_quotes);
151
152 if (need_quotes) {
153 size += sizeof("\"\"") - 1;
154 }
155
156 size += header->errstr.len + escape
157 + ngx_get_num_size(header->insert_id)
158 + ngx_get_num_size(header->affected_rows);
159
160 /* create the buffer */
161
162 pos = ngx_http_rds_csv_request_mem(r, ctx, size);
163 if (pos == NULL) {
164 return NGX_ERROR;
165 }
166
167 last = pos;
168
169 /* fill up the buffer */
170
171 last = ngx_sprintf(last, "errcode%cerrstr%cinsert_id%caffected_rows%V"
172 "%uD%c", sep, sep, sep, &conf->row_term,
173 (uint32_t) header->std_errcode, sep);
174
175 if (need_quotes) {
176 *last++ = '"';
177 }
178
179 if (escape == 0) {
180 last = ngx_copy(last, header->errstr.data, header->errstr.len);
181
182 } else {
183 last = (u_char *)
184 ngx_http_rds_csv_escape_csv_str(sep, last,
185 header->errstr.data,
186 header->errstr.len, NULL);
187 }
188
189 if (need_quotes) {
190 *last++ = '"';
191 }
192
193 last = ngx_sprintf(last, "%c%uL%c%uL%V", sep, header->insert_id, sep,
194 header->affected_rows, &conf->row_term);
195
196 if ((size_t) (last - pos) != size) {
197 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
198 "rds_csv: output header buffer error: %uz != %uz",
199 (size_t) (last - pos), size);
200
201 return NGX_ERROR;
202 }
203
204 if (r == r->main) {
205 last_buf = 1;
206 }
207
208 ctx->seen_stream_end = 1;
209
210 return ngx_http_rds_csv_submit_mem(r, ctx, size, last_buf);
211 }
212
213
214 ngx_int_t
ngx_http_rds_csv_output_field_names(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx)215 ngx_http_rds_csv_output_field_names(ngx_http_request_t *r,
216 ngx_http_rds_csv_ctx_t *ctx)
217 {
218 ngx_uint_t i;
219 ngx_http_rds_column_t *col;
220 size_t size;
221 u_char *pos, *last;
222 uintptr_t escape = 0;
223 unsigned need_quotes;
224 u_char sep;
225 ngx_http_rds_csv_loc_conf_t *conf;
226
227 conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
228
229 sep = (u_char) conf->field_sep;
230
231 size = ctx->col_count - 1 /* field sep count */
232 + conf->row_term.len;
233
234 for (i = 0; i < ctx->col_count; i++) {
235 col = &ctx->cols[i];
236 escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, col->name.data,
237 col->name.len, &need_quotes);
238
239 dd("field escape: %d", (int) escape);
240
241 if (need_quotes) {
242 size += sizeof("\"\"") - 1;
243 }
244
245 size += col->name.len + escape;
246 }
247
248 ctx->generated_col_names = 1;
249
250 pos = ngx_http_rds_csv_request_mem(r, ctx, size);
251 if (pos == NULL) {
252 return NGX_ERROR;
253 }
254
255 last = pos;
256
257 for (i = 0; i < ctx->col_count; i++) {
258 col = &ctx->cols[i];
259
260 escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, col->name.data,
261 col->name.len, &need_quotes);
262
263 if (need_quotes) {
264 *last++ = '"';
265 }
266
267 if (escape == 0) {
268 last = ngx_copy(last, col->name.data, col->name.len);
269
270 } else {
271 last = (u_char *)
272 ngx_http_rds_csv_escape_csv_str(sep, last,
273 col->name.data,
274 col->name.len, NULL);
275 }
276
277 if (need_quotes) {
278 *last++ = '"';
279 }
280
281 if (i != ctx->col_count - 1) {
282 *last++ = sep;
283 }
284 }
285
286 last = ngx_copy(last, conf->row_term.data, conf->row_term.len);
287
288 if ((size_t) (last - pos) != size) {
289 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
290 "rds_csv: output field names buffer error: %uz != %uz",
291 (size_t) (last - pos), size);
292
293 return NGX_ERROR;
294 }
295
296 return ngx_http_rds_csv_submit_mem(r, ctx, size, 0);
297 }
298
299
300 ngx_int_t
ngx_http_rds_csv_output_field(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,u_char * data,size_t len,int is_null)301 ngx_http_rds_csv_output_field(ngx_http_request_t *r,
302 ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len, int is_null)
303 {
304 u_char *pos, *last;
305 ngx_http_rds_column_t *col;
306 size_t size;
307 uintptr_t val_escape = 0;
308 unsigned need_quotes = 0;
309 u_char sep;
310 ngx_http_rds_csv_loc_conf_t *conf;
311 #if DDEBUG
312 u_char *p;
313 #endif
314
315 conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
316
317 sep = (u_char) conf->field_sep;
318
319 dd("reading row %llu, col %d, len %d",
320 (unsigned long long) ctx->row,
321 (int) ctx->cur_col, (int) len);
322
323 /* calculate the buffer size */
324
325 if (ctx->cur_col == 0) {
326 size = 0;
327
328 } else {
329 size = 1 /* field sep */;
330 }
331
332 col = &ctx->cols[ctx->cur_col];
333
334 if (len == 0 && ctx->field_data_rest > 0) {
335 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
336 "rds_csv: at least one octet should go with the field "
337 "size in one buf");
338
339 return NGX_ERROR;
340 }
341
342 if (is_null) {
343 /* SQL NULL is just empty in the CSV field */
344
345 } else if (len == 0) {
346 /* empty string is also empty */
347
348 } else {
349 switch (col->std_type & 0xc000) {
350 case rds_rough_col_type_float:
351 case rds_rough_col_type_int:
352 case rds_rough_col_type_bool:
353 size += len;
354 break;
355
356 default:
357 dd("string field found");
358
359 val_escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, data, len,
360 &need_quotes);
361
362 if (ctx->field_data_rest > 0 && !need_quotes) {
363 need_quotes = 1;
364 }
365
366 if (need_quotes) {
367 if (ctx->field_data_rest == 0) {
368 size += sizeof("\"\"") - 1;
369
370 } else {
371 size += sizeof("\"") - 1;
372 }
373 }
374
375 size += len + val_escape;
376 break;
377 }
378 }
379
380 if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) {
381 /* last column in the row */
382 size += conf->row_term.len;
383 }
384
385 /* allocate the buffer */
386
387 pos = ngx_http_rds_csv_request_mem(r, ctx, size);
388 if (pos == NULL) {
389 return NGX_ERROR;
390 }
391
392 last = pos;
393
394 /* fill up the buffer */
395
396 if (ctx->cur_col != 0) {
397 *last++ = sep;
398 }
399
400 if (is_null || len == 0) {
401 /* do nothing */
402
403 } else {
404 switch (col->std_type & 0xc000) {
405 case rds_rough_col_type_int:
406 case rds_rough_col_type_float:
407 case rds_rough_col_type_bool:
408 last = ngx_copy(last, data, len);
409 break;
410
411 default:
412 /* string */
413 if (need_quotes) {
414 *last++ = '"';
415 }
416
417 if (val_escape == 0) {
418 last = ngx_copy(last, data, len);
419
420 } else {
421 dd("field: string value escape non-zero: %d",
422 (int) val_escape);
423
424 #if DDEBUG
425 p = last;
426 #endif
427
428 last = (u_char *)
429 ngx_http_rds_csv_escape_csv_str(sep, last, data, len,
430 NULL);
431
432 #if DDEBUG
433 dd("escaped value \"%.*s\" (len %d, escape %d, escape2 %d)",
434 (int) (len + val_escape),
435 p, (int) (len + val_escape),
436 (int) val_escape,
437 (int) ((last - p) - len));
438 #endif
439 }
440
441 if (need_quotes && ctx->field_data_rest == 0) {
442 *last++ = '"';
443 }
444
445 break;
446 }
447 }
448
449 if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) {
450 last = ngx_copy(last, conf->row_term.data, conf->row_term.len);
451 }
452
453 if ((size_t) (last - pos) != size) {
454 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
455 "rds_csv: output field: buffer error (%d left)",
456 (int) size - (last - pos));
457
458 return NGX_ERROR;
459 }
460
461 return ngx_http_rds_csv_submit_mem(r, ctx, size, 0);
462 }
463
464
465 ngx_int_t
ngx_http_rds_csv_output_more_field_data(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,u_char * data,size_t len)466 ngx_http_rds_csv_output_more_field_data(ngx_http_request_t *r,
467 ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len)
468 {
469 u_char *pos, *last;
470 size_t size = 0;
471 ngx_http_rds_column_t *col;
472 uintptr_t escape = 0;
473 #if DDEBUG
474 u_char *p;
475 #endif
476 unsigned need_quotes;
477 u_char sep;
478 ngx_http_rds_csv_loc_conf_t *conf;
479
480 conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
481
482 sep = (u_char) conf->field_sep;
483
484 /* calculate the buffer size */
485
486 col = &ctx->cols[ctx->cur_col];
487
488 switch (col->std_type & 0xc000) {
489 case rds_rough_col_type_int:
490 case rds_rough_col_type_float:
491 case rds_rough_col_type_bool:
492 size += len;
493 break;
494
495 default:
496 /* string */
497
498 escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, data, len,
499 &need_quotes);
500
501 size = len + escape;
502
503 if (ctx->field_data_rest == 0) {
504 size += sizeof("\"") - 1;
505 }
506
507 break;
508 }
509
510 if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) {
511 /* last column in the row */
512 size += conf->row_term.len;
513 }
514
515 /* allocate the buffer */
516
517 pos = ngx_http_rds_csv_request_mem(r, ctx, size);
518 if (pos == NULL) {
519 return NGX_ERROR;
520 }
521
522 last = pos;
523
524 /* fill up the buffer */
525
526 switch (col->std_type & 0xc000) {
527 case rds_rough_col_type_int:
528 case rds_rough_col_type_float:
529 case rds_rough_col_type_bool:
530 last = ngx_copy(last, data, len);
531 break;
532
533 default:
534 /* string */
535 if (escape == 0) {
536 last = ngx_copy(last, data, len);
537
538 } else {
539 dd("more field data: string value escape non-zero: %d",
540 (int) escape);
541
542 #if DDEBUG
543 p = last;
544 #endif
545
546 last = (u_char *) ngx_http_rds_csv_escape_csv_str(sep, last, data,
547 len, NULL);
548
549 #if DDEBUG
550 dd("escaped value \"%.*s\" (len %d, escape %d, escape2 %d)",
551 (int) (len + escape),
552 p, (int) (len + escape),
553 (int) escape,
554 (int) ((last - p) - len));
555 #endif
556 }
557
558 if (ctx->field_data_rest == 0) {
559 *last++ = '"';
560 }
561
562 break;
563 } /* switch */
564
565 if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) {
566 /* last column in the row */
567 last = ngx_copy(last, conf->row_term.data, conf->row_term.len);
568 }
569
570 if ((size_t) (last - pos) != size) {
571 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
572 "rds_csv: output more field data: buffer error "
573 "(%d left)", (int) (size - (last - pos)));
574 return NGX_ERROR;
575 }
576
577 return ngx_http_rds_csv_submit_mem(r, ctx, size, 0);
578 }
579
580
581 static u_char *
ngx_http_rds_csv_request_mem(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,size_t len)582 ngx_http_rds_csv_request_mem(ngx_http_request_t *r,
583 ngx_http_rds_csv_ctx_t *ctx, size_t len)
584 {
585 ngx_int_t rc;
586 u_char *p;
587
588 rc = ngx_http_rds_csv_get_buf(r, ctx);
589 if (rc != NGX_OK) {
590 return NULL;
591 }
592
593 if (ctx->avail_out < len) {
594 p = ngx_http_rds_csv_get_postponed(r, ctx, len);
595 if (p == NULL) {
596 return NULL;
597 }
598
599 ctx->postponed.pos = p;
600 ctx->postponed.last = p + len;
601
602 return p;
603 }
604
605 return ctx->out_buf->last;
606 }
607
608
609 static ngx_int_t
ngx_http_rds_csv_get_buf(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx)610 ngx_http_rds_csv_get_buf(ngx_http_request_t *r, ngx_http_rds_csv_ctx_t *ctx)
611 {
612 ngx_http_rds_csv_loc_conf_t *conf;
613
614 dd("MEM enter");
615
616 if (ctx->avail_out) {
617 return NGX_OK;
618 }
619
620 conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
621
622 if (ctx->free_bufs) {
623 dd("MEM reusing temp buf from free_bufs");
624
625 ctx->out_buf = ctx->free_bufs->buf;
626 ctx->free_bufs = ctx->free_bufs->next;
627
628 } else {
629 dd("MEM creating temp buf with size: %d", (int) conf->buf_size);
630 ctx->out_buf = ngx_create_temp_buf(r->pool, conf->buf_size);
631 if (ctx->out_buf == NULL) {
632 return NGX_ERROR;
633 }
634
635 ctx->out_buf->tag = (ngx_buf_tag_t) &ngx_http_rds_csv_filter_module;
636 ctx->out_buf->recycled = 1;
637 }
638
639 ctx->avail_out = conf->buf_size;
640
641 return NGX_OK;
642 }
643
644
645 static u_char *
ngx_http_rds_csv_get_postponed(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,size_t len)646 ngx_http_rds_csv_get_postponed(ngx_http_request_t *r,
647 ngx_http_rds_csv_ctx_t *ctx, size_t len)
648 {
649 u_char *p;
650
651 dd("MEM enter");
652
653 if (ctx->cached.start == NULL) {
654 goto alloc;
655 }
656
657 if ((size_t) (ctx->cached.end - ctx->cached.start) < len) {
658 ngx_pfree(r->pool, ctx->cached.start);
659 goto alloc;
660 }
661
662 return ctx->cached.start;
663
664 alloc:
665
666 p = ngx_palloc(r->pool, len);
667 if (p == NULL) {
668 return NULL;
669 }
670
671 ctx->cached.start = p;
672 ctx->cached.end = p + len;
673
674 return p;
675 }
676
677
678 static ngx_int_t
ngx_http_rds_csv_submit_mem(ngx_http_request_t * r,ngx_http_rds_csv_ctx_t * ctx,size_t len,unsigned last_buf)679 ngx_http_rds_csv_submit_mem(ngx_http_request_t *r,
680 ngx_http_rds_csv_ctx_t *ctx, size_t len, unsigned last_buf)
681 {
682 ngx_chain_t *cl;
683 ngx_int_t rc;
684
685 if (ctx->postponed.pos != NULL) {
686 dd("MEM copy postponed data over to ctx->out for len %d", (int) len);
687
688 for ( ;; ) {
689 len = ctx->postponed.last - ctx->postponed.pos;
690 if (len > ctx->avail_out) {
691 len = ctx->avail_out;
692 }
693
694 ctx->out_buf->last = ngx_copy(ctx->out_buf->last,
695 ctx->postponed.pos, len);
696
697 ctx->avail_out -= len;
698
699 ctx->postponed.pos += len;
700
701 if (ctx->postponed.pos == ctx->postponed.last) {
702 ctx->postponed.pos = NULL;
703 }
704
705 if (ctx->avail_out > 0) {
706 break;
707 }
708
709 dd("MEM save ctx->out_buf");
710
711 cl = ngx_alloc_chain_link(r->pool);
712 if (cl == NULL) {
713 return NGX_ERROR;
714 }
715
716 cl->buf = ctx->out_buf;
717 cl->next = NULL;
718 *ctx->last_out = cl;
719 ctx->last_out = &cl->next;
720
721 if (ctx->postponed.pos == NULL) {
722 ctx->out_buf->last_buf = last_buf;
723 break;
724 }
725
726 rc = ngx_http_rds_csv_get_buf(r, ctx);
727 if (rc != NGX_OK) {
728 return NGX_ERROR;
729 }
730 }
731
732 return NGX_OK;
733 }
734
735 dd("MEM consuming out_buf for %d", (int) len);
736
737 ctx->out_buf->last += len;
738 ctx->avail_out -= len;
739 ctx->out_buf->last_buf = last_buf;
740
741 if (ctx->avail_out == 0) {
742 dd("MEM save ctx->out_buf");
743
744 cl = ngx_alloc_chain_link(r->pool);
745 if (cl == NULL) {
746 return NGX_ERROR;
747 }
748
749 cl->buf = ctx->out_buf;
750 cl->next = NULL;
751 *ctx->last_out = cl;
752 ctx->last_out = &cl->next;
753 }
754
755 return NGX_OK;
756 }
757
758
759 static size_t
ngx_get_num_size(uint64_t i)760 ngx_get_num_size(uint64_t i)
761 {
762 size_t n = 0;
763
764 do {
765 i = i / 10;
766 n++;
767 } while (i > 0);
768
769 return n;
770 }
771