1 /* Copyright 2012-present Facebook, Inc.
2 * Licensed under the Apache License, Version 2.0 */
3
4 #include "watchman.h"
5
w_json_buffer_init(w_jbuffer_t * jr)6 bool w_json_buffer_init(w_jbuffer_t *jr)
7 {
8 memset(jr, 0, sizeof(*jr));
9
10 jr->allocd = WATCHMAN_IO_BUF_SIZE;
11 jr->buf = malloc(jr->allocd);
12
13 if (!jr->buf) {
14 return false;
15 }
16
17 return true;
18 }
19
w_json_buffer_reset(w_jbuffer_t * jr)20 void w_json_buffer_reset(w_jbuffer_t *jr)
21 {
22 jr->wpos = 0;
23 jr->rpos = 0;
24 }
25
w_json_buffer_free(w_jbuffer_t * jr)26 void w_json_buffer_free(w_jbuffer_t *jr)
27 {
28 free(jr->buf);
29 memset(jr, 0, sizeof(*jr));
30 }
31
32 // Shunt down, return available size
shunt_down(w_jbuffer_t * jr)33 static inline uint32_t shunt_down(w_jbuffer_t *jr)
34 {
35 if (jr->rpos && jr->rpos == jr->wpos) {
36 jr->rpos = 0;
37 jr->wpos = 0;
38 }
39 if (jr->rpos && jr->rpos < jr->wpos) {
40 memmove(jr->buf, jr->buf + jr->rpos, jr->wpos - jr->rpos);
41 jr->wpos -= jr->rpos;
42 jr->rpos = 0;
43
44 }
45 return jr->allocd - jr->wpos;
46 }
47
fill_buffer(w_jbuffer_t * jr,w_stm_t stm)48 static bool fill_buffer(w_jbuffer_t *jr, w_stm_t stm)
49 {
50 uint32_t avail;
51 int r;
52
53 avail = shunt_down(jr);
54
55 // Get some more space if we need it
56 if (avail == 0) {
57 char *buf = realloc(jr->buf, jr->allocd * 2);
58
59 if (!buf) {
60 return false;
61 }
62
63 jr->buf = buf;
64 jr->allocd *= 2;
65
66 avail = jr->allocd - jr->wpos;
67 }
68
69 errno = 0;
70 r = w_stm_read(stm, jr->buf + jr->wpos, avail);
71 if (r <= 0) {
72 return false;
73 }
74
75 jr->wpos += r;
76
77 return true;
78 }
79
detect_pdu(w_jbuffer_t * jr)80 static inline enum w_pdu_type detect_pdu(w_jbuffer_t *jr)
81 {
82 if (jr->wpos - jr->rpos < 2) {
83 return need_data;
84 }
85 if (memcmp(jr->buf + jr->rpos, BSER_MAGIC, 2) == 0) {
86 return is_bser;
87 }
88 return is_json_compact;
89 }
90
read_json_pretty_pdu(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)91 static json_t *read_json_pretty_pdu(w_jbuffer_t *jr, w_stm_t stm, json_error_t *jerr)
92 {
93 char *nl;
94 int r;
95 json_t *res;
96
97 // Assume newline is at the end of what we have
98 nl = jr->buf + jr->wpos;
99 r = (int)(nl - (jr->buf + jr->rpos));
100 res = json_loadb(jr->buf + jr->rpos, r, 0, jerr);
101 if (!res) {
102 // Maybe we can fill more data into the buffer and retry?
103 if (!fill_buffer(jr, stm)) {
104 // No, then error is terminal
105 return NULL;
106 }
107 // Recompute end of buffer
108 nl = jr->buf + jr->wpos;
109 r = (int)(nl - (jr->buf + jr->rpos));
110 // And try parsing this
111 res = json_loadb(jr->buf + jr->rpos, r, 0, jerr);
112 }
113
114 // update read pos to look beyond this point
115 jr->rpos += r + 1;
116
117 return res;
118 }
119
read_json_pdu(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)120 static json_t *read_json_pdu(w_jbuffer_t *jr, w_stm_t stm, json_error_t *jerr)
121 {
122 char *nl;
123 int r;
124 json_t *res;
125
126 /* look for a newline; that indicates the end of
127 * a json packet */
128 nl = memchr(jr->buf + jr->rpos, '\n', jr->wpos - jr->rpos);
129
130 // If we don't have a newline, we need to fill the
131 // buffer
132 while (!nl) {
133 if (!fill_buffer(jr, stm)) {
134 if (errno == 0 && stm == w_stm_stdin()) {
135 // Ugly-ish hack to support the -j CLI option. This allows
136 // us to consume a JSON input that doesn't end with a newline.
137 // We only allow this on EOF when reading from stdin
138 nl = jr->buf + jr->wpos;
139 break;
140 }
141 return NULL;
142 }
143 nl = memchr(jr->buf + jr->rpos, '\n', jr->wpos - jr->rpos);
144 }
145
146 // buflen
147 r = (int)(nl - (jr->buf + jr->rpos));
148 res = json_loadb(jr->buf + jr->rpos, r, 0, jerr);
149
150 // update read pos to look beyond this point
151 jr->rpos += r + 1;
152
153 return res;
154 }
155
w_bser_decode_pdu_len(w_jbuffer_t * jr,w_stm_t stm,json_int_t * len,json_error_t * jerr)156 bool w_bser_decode_pdu_len(w_jbuffer_t *jr, w_stm_t stm,
157 json_int_t *len, json_error_t *jerr)
158 {
159 json_int_t needed;
160
161 while (!bunser_int(jr->buf + jr->rpos, jr->wpos - jr->rpos,
162 &needed, len)) {
163 if (needed == -1) {
164 snprintf(jerr->text, sizeof(jerr->text),
165 "failed to read PDU size");
166 return false;
167 }
168 if (!fill_buffer(jr, stm)) {
169 snprintf(jerr->text, sizeof(jerr->text),
170 "unable to fill buffer");
171 return false;
172 }
173 }
174 jr->rpos += (uint32_t)needed;
175
176 return true;
177 }
178
read_bser_pdu(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)179 static json_t *read_bser_pdu(w_jbuffer_t *jr, w_stm_t stm, json_error_t *jerr)
180 {
181 json_int_t needed;
182 json_int_t val;
183 uint32_t ideal;
184 json_int_t need;
185 int r;
186 json_t *obj;
187
188 jr->rpos += 2;
189
190 // We don't handle EAGAIN cleanly in here
191 w_stm_set_nonblock(stm, false);
192 if (!w_bser_decode_pdu_len(jr, stm, &val, jerr)) {
193 return NULL;
194 }
195
196 // val tells us exactly how much storage we need for this PDU
197 need = val - (jr->allocd - jr->wpos);
198 if (need > 0) {
199 ideal = jr->allocd;
200 while (ideal < (uint32_t)need) {
201 ideal *= 2;
202 }
203 if (ideal > jr->allocd) {
204 char *buf = realloc(jr->buf, ideal);
205
206 if (!buf) {
207 snprintf(jerr->text, sizeof(jerr->text),
208 "out of memory while allocating %" PRIu32 " bytes",
209 ideal);
210 return NULL;
211 }
212
213 jr->buf = buf;
214 jr->allocd = ideal;
215 }
216 }
217
218 // We have enough room for the whole thing, let's read it in
219 while ((jr->wpos - jr->rpos) < val) {
220 r = w_stm_read(stm, jr->buf + jr->wpos, jr->allocd - jr->wpos);
221 if (r <= 0) {
222 snprintf(jerr->text, sizeof(jerr->text),
223 "error reading PDU: %s",
224 strerror(errno));
225 return NULL;
226 }
227 jr->wpos += r;
228 }
229
230 obj = bunser(jr->buf + jr->rpos, jr->buf + jr->wpos, &needed, jerr);
231
232 // Ensure that we move the read position to the wpos; we consumed it all
233 jr->rpos = jr->wpos;
234
235 w_stm_set_nonblock(stm, true);
236 return obj;
237 }
238
read_and_detect_pdu(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)239 static bool read_and_detect_pdu(w_jbuffer_t *jr, w_stm_t stm,
240 json_error_t *jerr)
241 {
242 enum w_pdu_type pdu;
243
244 shunt_down(jr);
245 pdu = detect_pdu(jr);
246 if (pdu == need_data) {
247 if (!fill_buffer(jr, stm)) {
248 if (errno != EAGAIN) {
249 snprintf(jerr->text, sizeof(jerr->text),
250 "fill_buffer: %s",
251 errno ? strerror(errno) : "EOF");
252 }
253 return false;
254 }
255 pdu = detect_pdu(jr);
256 }
257
258 if (pdu == is_json_compact && stm == w_stm_stdin()) {
259 // Minor hack for the `-j` option for reading pretty printed
260 // json from stdin
261 pdu = is_json_pretty;
262 }
263
264 jr->pdu_type = pdu;
265 return true;
266 }
267
output_bytes(const char * buf,int x)268 static bool output_bytes(const char *buf, int x)
269 {
270 int res;
271
272 while (x > 0) {
273 res = (int)fwrite(buf, 1, x, stdout);
274 if (res == 0) {
275 return false;
276 }
277 buf += res;
278 x -= res;
279 }
280 return true;
281 }
282
stream_until_newline(w_jbuffer_t * reader,w_stm_t stm)283 static bool stream_until_newline(w_jbuffer_t *reader, w_stm_t stm)
284 {
285 int x;
286 char *buf, *nl;
287 bool is_done = false;
288
289 while (true) {
290 buf = reader->buf + reader->rpos;
291 nl = memchr(buf, '\n', reader->wpos - reader->rpos);
292 if (nl) {
293 x = 1 + (int)(nl - buf);
294 is_done = true;
295 } else {
296 x = reader->wpos - reader->rpos;
297 }
298
299 if (!output_bytes(buf, x)) {
300 return false;
301 }
302 buf += x;
303 reader->rpos += x;
304
305 if (is_done) {
306 break;
307 }
308
309 if (!fill_buffer(reader, stm)) {
310 break;
311 }
312 }
313 return true;
314 }
315
stream_n_bytes(w_jbuffer_t * jr,w_stm_t stm,json_int_t len,json_error_t * jerr)316 static bool stream_n_bytes(w_jbuffer_t *jr, w_stm_t stm, json_int_t len,
317 json_error_t *jerr)
318 {
319 uint32_t total = 0;
320
321 if (!output_bytes(jr->buf, jr->rpos)) {
322 snprintf(jerr->text, sizeof(jerr->text),
323 "failed output headers bytes %d: %s\n",
324 jr->rpos, strerror(errno));
325 return false;
326 }
327 while (len > 0) {
328 uint32_t avail = jr->wpos - jr->rpos;
329 int r;
330
331 if (avail) {
332 if (!output_bytes(jr->buf + jr->rpos, avail)) {
333 snprintf(jerr->text, sizeof(jerr->text),
334 "output_bytes: avail=%d, failed %s\n",
335 avail, strerror(errno));
336 return false;
337 }
338 jr->rpos += avail;
339 len -= avail;
340
341 if (len == 0) {
342 return true;
343 }
344 }
345
346 avail = MIN((uint32_t)len, shunt_down(jr));
347 r = w_stm_read(stm, jr->buf + jr->wpos, avail);
348
349 if (r <= 0) {
350 snprintf(jerr->text, sizeof(jerr->text),
351 "read: len=%"PRIi64" wanted %"PRIu32" got %d %s\n",
352 (int64_t)len, avail,
353 r, strerror(errno));
354 return false;
355 }
356 jr->wpos += r;
357 total += r;
358 }
359 return true;
360 }
361
stream_pdu(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)362 static bool stream_pdu(w_jbuffer_t *jr, w_stm_t stm, json_error_t *jerr)
363 {
364 switch (jr->pdu_type) {
365 case is_json_compact:
366 case is_json_pretty:
367 return stream_until_newline(jr, stm);
368 case is_bser:
369 {
370 json_int_t len;
371 jr->rpos += 2;
372 if (!w_bser_decode_pdu_len(jr, stm, &len, jerr)) {
373 return false;
374 }
375 return stream_n_bytes(jr, stm, len, jerr);
376 }
377 default:
378 w_log(W_LOG_FATAL, "not streaming for pdu type %d\n", jr->pdu_type);
379 return false;
380 }
381 }
382
read_pdu_into_json(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)383 static json_t *read_pdu_into_json(w_jbuffer_t *jr, w_stm_t stm,
384 json_error_t *jerr)
385 {
386 switch (jr->pdu_type) {
387 case is_json_compact:
388 return read_json_pdu(jr, stm, jerr);
389 case is_json_pretty:
390 return read_json_pretty_pdu(jr, stm, jerr);
391 default:
392 return read_bser_pdu(jr, stm, jerr);
393 }
394 }
395
w_json_buffer_passthru(w_jbuffer_t * jr,enum w_pdu_type output_pdu,w_jbuffer_t * output_pdu_buf,w_stm_t stm)396 bool w_json_buffer_passthru(w_jbuffer_t *jr,
397 enum w_pdu_type output_pdu,
398 w_jbuffer_t *output_pdu_buf,
399 w_stm_t stm)
400 {
401 json_t *j;
402 json_error_t jerr;
403 bool res;
404
405 if (!read_and_detect_pdu(jr, stm, &jerr)) {
406 w_log(W_LOG_ERR, "failed to identify PDU: %s\n",
407 jerr.text);
408 return false;
409 }
410
411 if (jr->pdu_type == output_pdu) {
412 // We can stream it through
413 if (!stream_pdu(jr, stm, &jerr)) {
414 w_log(W_LOG_ERR, "stream_pdu: %s\n", jerr.text);
415 return false;
416 }
417 return true;
418 }
419
420 j = read_pdu_into_json(jr, stm, &jerr);
421
422 if (!j) {
423 w_log(W_LOG_ERR, "failed to parse response: %s\n",
424 jerr.text);
425 return false;
426 }
427
428 w_json_buffer_reset(output_pdu_buf);
429
430 res = w_ser_write_pdu(output_pdu, output_pdu_buf, w_stm_stdout(), j);
431
432 json_decref(j);
433 return res;
434 }
435
w_json_buffer_next(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)436 json_t *w_json_buffer_next(w_jbuffer_t *jr, w_stm_t stm, json_error_t *jerr)
437 {
438 memset(jerr, 0, sizeof(*jerr));
439 if (!read_and_detect_pdu(jr, stm, jerr)) {
440 return NULL;
441 }
442 return read_pdu_into_json(jr, stm, jerr);
443 }
444
445 struct jbuffer_write_data {
446 w_stm_t stm;
447 w_jbuffer_t *jr;
448 };
449
jbuffer_flush(struct jbuffer_write_data * data)450 static bool jbuffer_flush(struct jbuffer_write_data *data)
451 {
452 int x;
453
454 while (data->jr->wpos - data->jr->rpos) {
455 x = w_stm_write(data->stm, data->jr->buf + data->jr->rpos,
456 data->jr->wpos - data->jr->rpos);
457
458 if (x <= 0) {
459 return false;
460 }
461
462 data->jr->rpos += x;
463 }
464
465 data->jr->rpos = data->jr->wpos = 0;
466 return true;
467 }
468
jbuffer_write(const char * buffer,size_t size,void * ptr)469 static int jbuffer_write(const char *buffer, size_t size, void *ptr)
470 {
471 struct jbuffer_write_data *data = ptr;
472
473 while (size) {
474 // Accumulate in the buffer
475 int room = data->jr->allocd - data->jr->wpos;
476
477 // No room? send it over the wire
478 if (!room) {
479 if (!jbuffer_flush(data)) {
480 return -1;
481 }
482 room = data->jr->allocd - data->jr->wpos;
483 }
484
485 if ((int)size < room) {
486 room = (int)size;
487 }
488
489 // Stick it in the buffer
490 memcpy(data->jr->buf + data->jr->wpos,
491 buffer, room);
492
493 buffer += room;
494 size -= room;
495 data->jr->wpos += room;
496 }
497
498 return 0;
499 }
500
w_json_buffer_write_bser(w_jbuffer_t * jr,w_stm_t stm,json_t * json)501 bool w_json_buffer_write_bser(w_jbuffer_t *jr, w_stm_t stm, json_t *json)
502 {
503 struct jbuffer_write_data data = { stm, jr };
504 int res;
505
506 res = w_bser_write_pdu(json, jbuffer_write, &data);
507
508 if (res != 0) {
509 return false;
510 }
511
512 return jbuffer_flush(&data);
513 }
514
w_json_buffer_write(w_jbuffer_t * jr,w_stm_t stm,json_t * json,int flags)515 bool w_json_buffer_write(w_jbuffer_t *jr, w_stm_t stm, json_t *json, int flags)
516 {
517 struct jbuffer_write_data data = { stm, jr };
518 int res;
519
520 res = json_dump_callback(json, jbuffer_write, &data, flags);
521
522 if (res != 0) {
523 return false;
524 }
525
526 if (jbuffer_write("\n", 1, &data) != 0) {
527 return false;
528 }
529
530 return jbuffer_flush(&data);
531 }
532
w_ser_write_pdu(enum w_pdu_type pdu_type,w_jbuffer_t * jr,w_stm_t stm,json_t * json)533 bool w_ser_write_pdu(enum w_pdu_type pdu_type,
534 w_jbuffer_t *jr, w_stm_t stm, json_t *json)
535 {
536 switch (pdu_type) {
537 case is_json_compact:
538 return w_json_buffer_write(jr, stm, json, JSON_COMPACT);
539 case is_json_pretty:
540 return w_json_buffer_write(jr, stm, json, JSON_INDENT(4));
541 case is_bser:
542 return w_json_buffer_write_bser(jr, stm, json);
543 case need_data:
544 default:
545 return false;
546 }
547 }
548
549 /* vim:ts=2:sw=2:et:
550 */
551