1 /*
2  * Copyright (C) agentzh
3  */
4 
5 
6 #ifndef DDEBUG
7 #define DDEBUG 0
8 #endif
9 #include "ddebug.h"
10 
11 
12 #include "ngx_http_rds_csv_processor.h"
13 #include "ngx_http_rds_csv_util.h"
14 #include "ngx_http_rds_csv_output.h"
15 #include "ngx_http_rds.h"
16 #include "ngx_http_rds_utils.h"
17 
18 
19 #include <ngx_core.h>
20 #include <ngx_http.h>
21 
22 
23 ngx_int_t
ngx_http_rds_csv_process_header(ngx_http_request_t * r,ngx_chain_t * in,ngx_http_rds_csv_ctx_t * ctx)24 ngx_http_rds_csv_process_header(ngx_http_request_t *r, ngx_chain_t *in,
25     ngx_http_rds_csv_ctx_t *ctx)
26 {
27     ngx_buf_t                       *b;
28     ngx_http_rds_header_t            header;
29     ngx_int_t                        rc;
30 
31     if (in == NULL) {
32         return NGX_OK;
33     }
34 
35     b = in->buf;
36 
37     if (!ngx_buf_in_memory(b)) {
38         if (!ngx_buf_special(b)) {
39             ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
40                           "rds_csv: process header: buf from "
41                           "upstream not in memory");
42             goto invalid;
43         }
44 
45         in = in->next;
46 
47         if (in == NULL) {
48             return NGX_OK;
49         }
50 
51         b = in->buf;
52     }
53 
54     rc = ngx_http_rds_parse_header(r, b, &header);
55 
56     if (rc != NGX_OK) {
57         goto invalid;
58     }
59 
60     dd("col count: %d", (int) header.col_count);
61 
62     if (header.col_count == 0) {
63         /* for empty result set, just return the JSON
64          * representation of the RDS header */
65 
66         dd("col count == 0");
67 
68         if (b->pos != b->last) {
69             ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
70                           "rds_csv: header: there's unexpected remaining data "
71                           "in the buf");
72 
73             goto invalid;
74         }
75 
76         ctx->state = state_done;
77 
78         /* now we send the postponed response header */
79         if (!ctx->header_sent) {
80             ctx->header_sent = 1;
81 
82             rc = ngx_http_rds_csv_next_header_filter(r);
83 
84             if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
85                 return rc;
86             }
87         }
88 
89         rc = ngx_http_rds_csv_output_header(r, ctx, &header);
90 
91         if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
92             return rc;
93         }
94 
95         ngx_http_rds_csv_discard_bufs(r->pool, in);
96 
97         return rc;
98     }
99 
100     ctx->cols = ngx_palloc(r->pool,
101                            header.col_count * sizeof(ngx_http_rds_column_t));
102 
103     if (ctx->cols == NULL) {
104         goto invalid;
105     }
106 
107     ctx->state = state_expect_col;
108     ctx->cur_col = 0;
109     ctx->col_count = header.col_count;
110 
111     /* now we send the postponed response header */
112     if (!ctx->header_sent) {
113         ctx->header_sent = 1;
114 
115         rc = ngx_http_rds_csv_next_header_filter(r);
116         if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
117             return rc;
118         }
119     }
120 
121     return ngx_http_rds_csv_process_col(r, b->pos == b->last ? in->next : in,
122                                         ctx);
123 
124 invalid:
125 
126     dd("return 500");
127     if (!ctx->header_sent) {
128         ctx->header_sent = 1;
129 
130         r->headers_out.status = NGX_HTTP_INTERNAL_SERVER_ERROR;
131         ngx_http_send_header(r);
132         ngx_http_send_special(r, NGX_HTTP_LAST);
133 
134         return NGX_ERROR;
135     }
136 
137     return NGX_ERROR;
138 }
139 
140 
141 ngx_int_t
ngx_http_rds_csv_process_col(ngx_http_request_t * r,ngx_chain_t * in,ngx_http_rds_csv_ctx_t * ctx)142 ngx_http_rds_csv_process_col(ngx_http_request_t *r, ngx_chain_t *in,
143     ngx_http_rds_csv_ctx_t *ctx)
144 {
145     ngx_buf_t                       *b;
146     ngx_int_t                        rc;
147     ngx_http_rds_csv_loc_conf_t     *conf;
148 
149     if (in == NULL) {
150         return NGX_OK;
151     }
152 
153     b = in->buf;
154 
155     if (!ngx_buf_in_memory(b)) {
156         if (!ngx_buf_special(b)) {
157             ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
158                           "rds_csv: process col: buf from upstream not in "
159                           "memory");
160             return NGX_ERROR;
161         }
162 
163         in = in->next;
164 
165         if (in == NULL) {
166             return NGX_OK;
167         }
168 
169         b = in->buf;
170     }
171 
172     dd("parsing rds column");
173 
174     rc = ngx_http_rds_parse_col(r, b, &ctx->cols[ctx->cur_col]);
175 
176     dd("parse col returns %d (%d)", (int) rc, (int) NGX_OK);
177 
178     if (rc != NGX_OK) {
179         return NGX_ERROR;
180     }
181 
182     if (b->pos == b->last) {
183         dd("parse col buf consumed");
184         in = in->next;
185     }
186 
187     ctx->cur_col++;
188 
189     if (ctx->cur_col >= ctx->col_count) {
190         dd("end of column list");
191 
192         ctx->state = state_expect_row;
193         ctx->row = 0;
194 
195         conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
196 
197         if (conf->field_name_header) {
198             rc = ngx_http_rds_csv_output_field_names(r, ctx);
199 
200             if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
201                 return rc;
202             }
203         }
204 
205         dd("after output literal");
206 
207         if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
208             return rc;
209         }
210 
211 
212         dd("process col is entering process row...");
213         return ngx_http_rds_csv_process_row(r, in, ctx);
214     }
215 
216     return ngx_http_rds_csv_process_col(r, in, ctx);
217 }
218 
219 
220 ngx_int_t
ngx_http_rds_csv_process_row(ngx_http_request_t * r,ngx_chain_t * in,ngx_http_rds_csv_ctx_t * ctx)221 ngx_http_rds_csv_process_row(ngx_http_request_t *r, ngx_chain_t *in,
222     ngx_http_rds_csv_ctx_t *ctx)
223 {
224     ngx_buf_t                   *b;
225     ngx_int_t                    rc;
226 
227     if (in == NULL) {
228         return NGX_OK;
229     }
230 
231     dd("process row");
232 
233     b = in->buf;
234 
235     if (!ngx_buf_in_memory(b)) {
236         if (!ngx_buf_special(b)) {
237             ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
238                           "rds_csv: process row: buf from "
239                           "upstream not in memory");
240             return NGX_ERROR;
241         }
242 
243         in = in->next;
244 
245         if (in == NULL) {
246             return NGX_OK;
247         }
248 
249         b = in->buf;
250     }
251 
252     if (b->last - b->pos < (ssize_t) sizeof(uint8_t)) {
253         ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
254                       "rds_csv: row flag is incomplete in the buf");
255         return NGX_ERROR;
256     }
257 
258     dd("row flag: %d (offset %d)", (char) *b->pos, (int) (b->pos - b->start));
259 
260     if (*b->pos++ == 0) {
261         /* end of row list */
262         ctx->state = state_done;
263 
264         if (b->pos != b->last) {
265             ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
266                           "rds_csv: row: there's unexpected remaining data "
267                           "in the buf");
268             return NGX_ERROR;
269         }
270 
271         rc = ngx_http_rds_csv_output_literal(r, ctx, (u_char *) "", 0,
272                                              1 /* last buf*/);
273 
274         if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
275             return rc;
276         }
277 
278         return rc;
279     }
280 
281     ctx->row++;
282     ctx->cur_col = 0;
283     ctx->state = state_expect_field;
284 
285     if (b->pos == b->last) {
286         in = in->next;
287 
288     } else {
289         dd("process row: buf not consumed completely");
290     }
291 
292     return ngx_http_rds_csv_process_field(r, in, ctx);
293 }
294 
295 
296 ngx_int_t
ngx_http_rds_csv_process_field(ngx_http_request_t * r,ngx_chain_t * in,ngx_http_rds_csv_ctx_t * ctx)297 ngx_http_rds_csv_process_field(ngx_http_request_t *r, ngx_chain_t *in,
298     ngx_http_rds_csv_ctx_t *ctx)
299 {
300     size_t              total, len;
301     ngx_buf_t          *b;
302     ngx_int_t           rc;
303 
304     for (;;) {
305         if (in == NULL) {
306             return NGX_OK;
307         }
308 
309         b = in->buf;
310 
311         if (!ngx_buf_in_memory(b)) {
312             dd("buf not in memory");
313 
314             if (!ngx_buf_special(b)) {
315                 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
316                               "rds_csv: process field: buf from "
317                               "upstream not in memory");
318                 return NGX_ERROR;
319             }
320 
321             in = in->next;
322 
323             if (in == NULL) {
324                 return NGX_OK;
325             }
326 
327             b = in->buf;
328         }
329 
330         dd("process field: buf size: %d", (int) ngx_buf_size(b));
331 
332         if (b->last - b->pos < (ssize_t) sizeof(uint32_t)) {
333             ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
334                           "rds_csv: field size is incomplete in the buf: %*s "
335                           "(len: %d)", b->last - b->pos, b->pos,
336                           (int) (b->last - b->pos));
337 
338             return NGX_ERROR;
339         }
340 
341         total = *(uint32_t *) b->pos;
342 
343         dd("total: %d", (int) total);
344 
345         b->pos += sizeof(uint32_t);
346 
347         if (total == (uint32_t) -1) {
348             /* SQL NULL found */
349             total = 0;
350             len = 0;
351             ctx->field_data_rest = 0;
352 
353             rc = ngx_http_rds_csv_output_field(r, ctx, b->pos, len,
354                                                1 /* is null */);
355 
356         } else {
357             len = (uint32_t) (b->last - b->pos);
358 
359             if (len >= total) {
360                 len = total;
361             }
362 
363             ctx->field_data_rest = total - len;
364 
365             rc = ngx_http_rds_csv_output_field(r, ctx, b->pos, len,
366                                                0 /* not null */);
367         }
368 
369         if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
370             return rc;
371         }
372 
373         b->pos += len;
374 
375         if (b->pos == b->last) {
376             in = in->next;
377         }
378 
379         if (len < total) {
380             dd("process field: need to read more field data");
381 
382             ctx->state = state_expect_more_field_data;
383 
384             return ngx_http_rds_csv_process_more_field_data(r, in, ctx);
385         }
386 
387         ctx->cur_col++;
388 
389         if (ctx->cur_col >= ctx->col_count) {
390             dd("reached the end of the current row");
391 
392             ctx->state = state_expect_row;
393 
394             return ngx_http_rds_csv_process_row(r, in, ctx);
395         }
396 
397         /* continue to process the next field (if any) */
398     }
399 
400     /* impossible to reach here */
401 
402     return NGX_ERROR;
403 }
404 
405 
406 ngx_int_t
ngx_http_rds_csv_process_more_field_data(ngx_http_request_t * r,ngx_chain_t * in,ngx_http_rds_csv_ctx_t * ctx)407 ngx_http_rds_csv_process_more_field_data(ngx_http_request_t *r,
408     ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx)
409 {
410     ngx_int_t                    rc;
411     ngx_buf_t                   *b;
412     size_t                       len;
413 
414     for (;;) {
415         if (in == NULL) {
416             return NGX_OK;
417         }
418 
419         b = in->buf;
420 
421         if (!ngx_buf_in_memory(b)) {
422             ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
423                           "rds_csv: buf from upstream not in memory");
424             return NGX_ERROR;
425         }
426 
427         len = b->last - b->pos;
428 
429         if (len >= ctx->field_data_rest) {
430             len = ctx->field_data_rest;
431             ctx->field_data_rest = 0;
432 
433         } else {
434             ctx->field_data_rest -= len;
435         }
436 
437         rc = ngx_http_rds_csv_output_more_field_data(r, ctx, b->pos, len);
438 
439         if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
440             return rc;
441         }
442 
443         b->pos += len;
444 
445         if (b->pos == b->last) {
446             in = in->next;
447         }
448 
449         if (ctx->field_data_rest) {
450             dd("process more field data: still some data remaining");
451             continue;
452         }
453 
454         dd("process more field data: reached the end of the current field");
455 
456         ctx->cur_col++;
457 
458         if (ctx->cur_col >= ctx->col_count) {
459             dd("process more field data: reached the end of the current row");
460 
461             ctx->state = state_expect_row;
462 
463             return ngx_http_rds_csv_process_row(r, in, ctx);
464         }
465 
466         dd("proces more field data: read the next field");
467 
468         ctx->state = state_expect_field;
469 
470         return ngx_http_rds_csv_process_field(r, in, ctx);
471     }
472 
473     /* impossible to reach here */
474 
475     return NGX_ERROR;
476 }
477