1 /*
2 * Copyright (C) agentzh
3 */
4
5
6 #ifndef DDEBUG
7 #define DDEBUG 0
8 #endif
9 #include "ddebug.h"
10
11
12 #include "ngx_http_rds_csv_processor.h"
13 #include "ngx_http_rds_csv_util.h"
14 #include "ngx_http_rds_csv_output.h"
15 #include "ngx_http_rds.h"
16 #include "ngx_http_rds_utils.h"
17
18
19 #include <ngx_core.h>
20 #include <ngx_http.h>
21
22
23 ngx_int_t
ngx_http_rds_csv_process_header(ngx_http_request_t * r,ngx_chain_t * in,ngx_http_rds_csv_ctx_t * ctx)24 ngx_http_rds_csv_process_header(ngx_http_request_t *r, ngx_chain_t *in,
25 ngx_http_rds_csv_ctx_t *ctx)
26 {
27 ngx_buf_t *b;
28 ngx_http_rds_header_t header;
29 ngx_int_t rc;
30
31 if (in == NULL) {
32 return NGX_OK;
33 }
34
35 b = in->buf;
36
37 if (!ngx_buf_in_memory(b)) {
38 if (!ngx_buf_special(b)) {
39 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
40 "rds_csv: process header: buf from "
41 "upstream not in memory");
42 goto invalid;
43 }
44
45 in = in->next;
46
47 if (in == NULL) {
48 return NGX_OK;
49 }
50
51 b = in->buf;
52 }
53
54 rc = ngx_http_rds_parse_header(r, b, &header);
55
56 if (rc != NGX_OK) {
57 goto invalid;
58 }
59
60 dd("col count: %d", (int) header.col_count);
61
62 if (header.col_count == 0) {
63 /* for empty result set, just return the JSON
64 * representation of the RDS header */
65
66 dd("col count == 0");
67
68 if (b->pos != b->last) {
69 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
70 "rds_csv: header: there's unexpected remaining data "
71 "in the buf");
72
73 goto invalid;
74 }
75
76 ctx->state = state_done;
77
78 /* now we send the postponed response header */
79 if (!ctx->header_sent) {
80 ctx->header_sent = 1;
81
82 rc = ngx_http_rds_csv_next_header_filter(r);
83
84 if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
85 return rc;
86 }
87 }
88
89 rc = ngx_http_rds_csv_output_header(r, ctx, &header);
90
91 if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
92 return rc;
93 }
94
95 ngx_http_rds_csv_discard_bufs(r->pool, in);
96
97 return rc;
98 }
99
100 ctx->cols = ngx_palloc(r->pool,
101 header.col_count * sizeof(ngx_http_rds_column_t));
102
103 if (ctx->cols == NULL) {
104 goto invalid;
105 }
106
107 ctx->state = state_expect_col;
108 ctx->cur_col = 0;
109 ctx->col_count = header.col_count;
110
111 /* now we send the postponed response header */
112 if (!ctx->header_sent) {
113 ctx->header_sent = 1;
114
115 rc = ngx_http_rds_csv_next_header_filter(r);
116 if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
117 return rc;
118 }
119 }
120
121 return ngx_http_rds_csv_process_col(r, b->pos == b->last ? in->next : in,
122 ctx);
123
124 invalid:
125
126 dd("return 500");
127 if (!ctx->header_sent) {
128 ctx->header_sent = 1;
129
130 r->headers_out.status = NGX_HTTP_INTERNAL_SERVER_ERROR;
131 ngx_http_send_header(r);
132 ngx_http_send_special(r, NGX_HTTP_LAST);
133
134 return NGX_ERROR;
135 }
136
137 return NGX_ERROR;
138 }
139
140
141 ngx_int_t
ngx_http_rds_csv_process_col(ngx_http_request_t * r,ngx_chain_t * in,ngx_http_rds_csv_ctx_t * ctx)142 ngx_http_rds_csv_process_col(ngx_http_request_t *r, ngx_chain_t *in,
143 ngx_http_rds_csv_ctx_t *ctx)
144 {
145 ngx_buf_t *b;
146 ngx_int_t rc;
147 ngx_http_rds_csv_loc_conf_t *conf;
148
149 if (in == NULL) {
150 return NGX_OK;
151 }
152
153 b = in->buf;
154
155 if (!ngx_buf_in_memory(b)) {
156 if (!ngx_buf_special(b)) {
157 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
158 "rds_csv: process col: buf from upstream not in "
159 "memory");
160 return NGX_ERROR;
161 }
162
163 in = in->next;
164
165 if (in == NULL) {
166 return NGX_OK;
167 }
168
169 b = in->buf;
170 }
171
172 dd("parsing rds column");
173
174 rc = ngx_http_rds_parse_col(r, b, &ctx->cols[ctx->cur_col]);
175
176 dd("parse col returns %d (%d)", (int) rc, (int) NGX_OK);
177
178 if (rc != NGX_OK) {
179 return NGX_ERROR;
180 }
181
182 if (b->pos == b->last) {
183 dd("parse col buf consumed");
184 in = in->next;
185 }
186
187 ctx->cur_col++;
188
189 if (ctx->cur_col >= ctx->col_count) {
190 dd("end of column list");
191
192 ctx->state = state_expect_row;
193 ctx->row = 0;
194
195 conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
196
197 if (conf->field_name_header) {
198 rc = ngx_http_rds_csv_output_field_names(r, ctx);
199
200 if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
201 return rc;
202 }
203 }
204
205 dd("after output literal");
206
207 if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
208 return rc;
209 }
210
211
212 dd("process col is entering process row...");
213 return ngx_http_rds_csv_process_row(r, in, ctx);
214 }
215
216 return ngx_http_rds_csv_process_col(r, in, ctx);
217 }
218
219
220 ngx_int_t
ngx_http_rds_csv_process_row(ngx_http_request_t * r,ngx_chain_t * in,ngx_http_rds_csv_ctx_t * ctx)221 ngx_http_rds_csv_process_row(ngx_http_request_t *r, ngx_chain_t *in,
222 ngx_http_rds_csv_ctx_t *ctx)
223 {
224 ngx_buf_t *b;
225 ngx_int_t rc;
226
227 if (in == NULL) {
228 return NGX_OK;
229 }
230
231 dd("process row");
232
233 b = in->buf;
234
235 if (!ngx_buf_in_memory(b)) {
236 if (!ngx_buf_special(b)) {
237 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
238 "rds_csv: process row: buf from "
239 "upstream not in memory");
240 return NGX_ERROR;
241 }
242
243 in = in->next;
244
245 if (in == NULL) {
246 return NGX_OK;
247 }
248
249 b = in->buf;
250 }
251
252 if (b->last - b->pos < (ssize_t) sizeof(uint8_t)) {
253 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
254 "rds_csv: row flag is incomplete in the buf");
255 return NGX_ERROR;
256 }
257
258 dd("row flag: %d (offset %d)", (char) *b->pos, (int) (b->pos - b->start));
259
260 if (*b->pos++ == 0) {
261 /* end of row list */
262 ctx->state = state_done;
263
264 if (b->pos != b->last) {
265 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
266 "rds_csv: row: there's unexpected remaining data "
267 "in the buf");
268 return NGX_ERROR;
269 }
270
271 rc = ngx_http_rds_csv_output_literal(r, ctx, (u_char *) "", 0,
272 1 /* last buf*/);
273
274 if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
275 return rc;
276 }
277
278 return rc;
279 }
280
281 ctx->row++;
282 ctx->cur_col = 0;
283 ctx->state = state_expect_field;
284
285 if (b->pos == b->last) {
286 in = in->next;
287
288 } else {
289 dd("process row: buf not consumed completely");
290 }
291
292 return ngx_http_rds_csv_process_field(r, in, ctx);
293 }
294
295
296 ngx_int_t
ngx_http_rds_csv_process_field(ngx_http_request_t * r,ngx_chain_t * in,ngx_http_rds_csv_ctx_t * ctx)297 ngx_http_rds_csv_process_field(ngx_http_request_t *r, ngx_chain_t *in,
298 ngx_http_rds_csv_ctx_t *ctx)
299 {
300 size_t total, len;
301 ngx_buf_t *b;
302 ngx_int_t rc;
303
304 for (;;) {
305 if (in == NULL) {
306 return NGX_OK;
307 }
308
309 b = in->buf;
310
311 if (!ngx_buf_in_memory(b)) {
312 dd("buf not in memory");
313
314 if (!ngx_buf_special(b)) {
315 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
316 "rds_csv: process field: buf from "
317 "upstream not in memory");
318 return NGX_ERROR;
319 }
320
321 in = in->next;
322
323 if (in == NULL) {
324 return NGX_OK;
325 }
326
327 b = in->buf;
328 }
329
330 dd("process field: buf size: %d", (int) ngx_buf_size(b));
331
332 if (b->last - b->pos < (ssize_t) sizeof(uint32_t)) {
333 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
334 "rds_csv: field size is incomplete in the buf: %*s "
335 "(len: %d)", b->last - b->pos, b->pos,
336 (int) (b->last - b->pos));
337
338 return NGX_ERROR;
339 }
340
341 total = *(uint32_t *) b->pos;
342
343 dd("total: %d", (int) total);
344
345 b->pos += sizeof(uint32_t);
346
347 if (total == (uint32_t) -1) {
348 /* SQL NULL found */
349 total = 0;
350 len = 0;
351 ctx->field_data_rest = 0;
352
353 rc = ngx_http_rds_csv_output_field(r, ctx, b->pos, len,
354 1 /* is null */);
355
356 } else {
357 len = (uint32_t) (b->last - b->pos);
358
359 if (len >= total) {
360 len = total;
361 }
362
363 ctx->field_data_rest = total - len;
364
365 rc = ngx_http_rds_csv_output_field(r, ctx, b->pos, len,
366 0 /* not null */);
367 }
368
369 if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
370 return rc;
371 }
372
373 b->pos += len;
374
375 if (b->pos == b->last) {
376 in = in->next;
377 }
378
379 if (len < total) {
380 dd("process field: need to read more field data");
381
382 ctx->state = state_expect_more_field_data;
383
384 return ngx_http_rds_csv_process_more_field_data(r, in, ctx);
385 }
386
387 ctx->cur_col++;
388
389 if (ctx->cur_col >= ctx->col_count) {
390 dd("reached the end of the current row");
391
392 ctx->state = state_expect_row;
393
394 return ngx_http_rds_csv_process_row(r, in, ctx);
395 }
396
397 /* continue to process the next field (if any) */
398 }
399
400 /* impossible to reach here */
401
402 return NGX_ERROR;
403 }
404
405
406 ngx_int_t
ngx_http_rds_csv_process_more_field_data(ngx_http_request_t * r,ngx_chain_t * in,ngx_http_rds_csv_ctx_t * ctx)407 ngx_http_rds_csv_process_more_field_data(ngx_http_request_t *r,
408 ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx)
409 {
410 ngx_int_t rc;
411 ngx_buf_t *b;
412 size_t len;
413
414 for (;;) {
415 if (in == NULL) {
416 return NGX_OK;
417 }
418
419 b = in->buf;
420
421 if (!ngx_buf_in_memory(b)) {
422 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
423 "rds_csv: buf from upstream not in memory");
424 return NGX_ERROR;
425 }
426
427 len = b->last - b->pos;
428
429 if (len >= ctx->field_data_rest) {
430 len = ctx->field_data_rest;
431 ctx->field_data_rest = 0;
432
433 } else {
434 ctx->field_data_rest -= len;
435 }
436
437 rc = ngx_http_rds_csv_output_more_field_data(r, ctx, b->pos, len);
438
439 if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
440 return rc;
441 }
442
443 b->pos += len;
444
445 if (b->pos == b->last) {
446 in = in->next;
447 }
448
449 if (ctx->field_data_rest) {
450 dd("process more field data: still some data remaining");
451 continue;
452 }
453
454 dd("process more field data: reached the end of the current field");
455
456 ctx->cur_col++;
457
458 if (ctx->cur_col >= ctx->col_count) {
459 dd("process more field data: reached the end of the current row");
460
461 ctx->state = state_expect_row;
462
463 return ngx_http_rds_csv_process_row(r, in, ctx);
464 }
465
466 dd("proces more field data: read the next field");
467
468 ctx->state = state_expect_field;
469
470 return ngx_http_rds_csv_process_field(r, in, ctx);
471 }
472
473 /* impossible to reach here */
474
475 return NGX_ERROR;
476 }
477