1 /* Copyright 2012-present Facebook, Inc.
2 * Licensed under the Apache License, Version 2.0 */
3
4 #include "watchman.h"
5
6 W_CAP_REG("bser-v2")
watchman_json_buffer()7 watchman_json_buffer::watchman_json_buffer()
8 : buf((char*)malloc(WATCHMAN_IO_BUF_SIZE)),
9 allocd(WATCHMAN_IO_BUF_SIZE),
10 rpos(0),
11 wpos(0),
12 pdu_type(need_data),
13 capabilities(0) {
14 if (!buf) {
15 throw std::bad_alloc();
16 }
17 }
18
clear()19 void watchman_json_buffer::clear() {
20 wpos = 0;
21 rpos = 0;
22 }
23
~watchman_json_buffer()24 watchman_json_buffer::~watchman_json_buffer() {
25 free(buf);
26 }
27
28 // Shunt down, return available size
shuntDown()29 uint32_t watchman_json_buffer::shuntDown() {
30 if (rpos && rpos == wpos) {
31 rpos = 0;
32 wpos = 0;
33 }
34 if (rpos && rpos < wpos) {
35 memmove(buf, buf + rpos, wpos - rpos);
36 wpos -= rpos;
37 rpos = 0;
38 }
39 return allocd - wpos;
40 }
41
fillBuffer(w_stm_t stm)42 bool watchman_json_buffer::fillBuffer(w_stm_t stm) {
43 uint32_t avail;
44 int r;
45
46 avail = shuntDown();
47
48 // Get some more space if we need it
49 if (avail == 0) {
50 char* newBuf = (char*)realloc(buf, allocd * 2);
51
52 if (!newBuf) {
53 return false;
54 }
55
56 buf = newBuf;
57 allocd *= 2;
58
59 avail = allocd - wpos;
60 }
61
62 errno = 0;
63 r = stm->read(buf + wpos, avail);
64 if (r <= 0) {
65 return false;
66 }
67
68 wpos += r;
69
70 return true;
71 }
72
detectPdu()73 inline enum w_pdu_type watchman_json_buffer::detectPdu() {
74 if (wpos - rpos < 2) {
75 return need_data;
76 }
77 if (memcmp(buf + rpos, BSER_MAGIC, 2) == 0) {
78 return is_bser;
79 }
80 if (memcmp(buf + rpos, BSER_V2_MAGIC, 2) == 0) {
81 return is_bser_v2;
82 }
83 return is_json_compact;
84 }
85
readJsonPrettyPdu(w_stm_t stm,json_error_t * jerr)86 json_ref watchman_json_buffer::readJsonPrettyPdu(
87 w_stm_t stm,
88 json_error_t* jerr) {
89 char *nl;
90 int r;
91 json_ref res;
92
93 // Assume newline is at the end of what we have
94 nl = buf + wpos;
95 r = (int)(nl - (buf + rpos));
96 res = json_loadb(buf + rpos, r, 0, jerr);
97 while (!res) {
98 // Maybe we can fill more data into the buffer and retry?
99 if (!fillBuffer(stm)) {
100 // No, then error is terminal
101 return nullptr;
102 }
103 // Recompute end of buffer
104 nl = buf + wpos;
105 r = (int)(nl - (buf + rpos));
106 // And try parsing this
107 res = json_loadb(buf + rpos, r, 0, jerr);
108 }
109
110 // update read pos to look beyond this point
111 rpos += r + 1;
112
113 return res;
114 }
115
readJsonPdu(w_stm_t stm,json_error_t * jerr)116 json_ref watchman_json_buffer::readJsonPdu(w_stm_t stm, json_error_t* jerr) {
117 int r;
118
119 /* look for a newline; that indicates the end of
120 * a json packet */
121 auto nl = (char*)memchr(buf + rpos, '\n', wpos - rpos);
122
123 // If we don't have a newline, we need to fill the
124 // buffer
125 while (!nl) {
126 if (!fillBuffer(stm)) {
127 if (errno == 0 && stm == w_stm_stdin()) {
128 // Ugly-ish hack to support the -j CLI option. This allows
129 // us to consume a JSON input that doesn't end with a newline.
130 // We only allow this on EOF when reading from stdin
131 nl = buf + wpos;
132 break;
133 }
134 return nullptr;
135 }
136 nl = (char*)memchr(buf + rpos, '\n', wpos - rpos);
137 }
138
139 // buflen
140 r = (int)(nl - (buf + rpos));
141 auto res = json_loadb(buf + rpos, r, 0, jerr);
142
143 // update read pos to look beyond this point
144 rpos += r + 1;
145
146 return res;
147 }
148
decodePduInfo(w_stm_t stm,uint32_t bser_version,json_int_t * len,json_int_t * bser_capabilities,json_error_t * jerr)149 bool watchman_json_buffer::decodePduInfo(
150 w_stm_t stm,
151 uint32_t bser_version,
152 json_int_t* len,
153 json_int_t* bser_capabilities,
154 json_error_t* jerr) {
155 json_int_t needed;
156 if (bser_version == 2) {
157 uint32_t capabilities;
158 while (wpos - rpos < sizeof(capabilities)) {
159 if (!fillBuffer(stm)) {
160 snprintf(jerr->text, sizeof(jerr->text),
161 "unable to fill buffer");
162 return false;
163 }
164 }
165 // json_int_t is architecture-dependent, so go through the uint32_t for
166 // safety.
167 memcpy(&capabilities, buf + rpos, sizeof(capabilities));
168 *bser_capabilities = capabilities;
169 rpos += sizeof(capabilities);
170 }
171
172 while (!bunser_int(buf + rpos, wpos - rpos, &needed, len)) {
173 if (needed == -1) {
174 snprintf(jerr->text, sizeof(jerr->text),
175 "failed to read PDU size");
176 return false;
177 }
178 if (!fillBuffer(stm)) {
179 snprintf(jerr->text, sizeof(jerr->text),
180 "unable to fill buffer");
181 return false;
182 }
183 }
184 rpos += (uint32_t)needed;
185
186 return true;
187 }
188
readBserPdu(w_stm_t stm,uint32_t bser_version,json_error_t * jerr)189 json_ref watchman_json_buffer::readBserPdu(
190 w_stm_t stm,
191 uint32_t bser_version,
192 json_error_t* jerr) {
193 json_int_t needed;
194 json_int_t val;
195 json_int_t bser_capabilities;
196 uint32_t ideal;
197 int r;
198 json_ref obj;
199
200 rpos += 2;
201
202 // We don't handle EAGAIN cleanly in here
203 stm->setNonBlock(false);
204 if (!decodePduInfo(stm, bser_version, &val, &bser_capabilities, jerr)) {
205 return nullptr;
206 }
207
208 // val tells us exactly how much storage we need for this PDU
209 if (val > allocd - wpos) {
210 ideal = allocd;
211 while ((ideal - wpos) < (uint32_t)val) {
212 ideal *= 2;
213 }
214 if (ideal > allocd) {
215 auto newBuf = (char*)realloc(buf, ideal);
216
217 if (!newBuf) {
218 snprintf(jerr->text, sizeof(jerr->text),
219 "out of memory while allocating %" PRIu32 " bytes",
220 ideal);
221 return nullptr;
222 }
223
224 buf = newBuf;
225 allocd = ideal;
226 }
227 }
228
229 // We have enough room for the whole thing, let's read it in
230 while ((wpos - rpos) < val) {
231 r = stm->read(buf + wpos, allocd - wpos);
232 if (r <= 0) {
233 jerr->position = wpos - rpos;
234 snprintf(
235 jerr->text,
236 sizeof(jerr->text),
237 "error reading %" PRIu32 " bytes val=%" PRIu64
238 " wpos=%" PRIu32 " rpos=%" PRIu32 " for PDU: %s",
239 uint32_t(allocd - wpos),
240 int64_t(val),
241 wpos,
242 rpos,
243 strerror(errno));
244 return nullptr;
245 }
246 wpos += r;
247 }
248
249 obj = bunser(buf + rpos, buf + wpos, &needed, jerr);
250
251 // Ensure that we move the read position to the wpos; we consumed it all
252 rpos = wpos;
253
254 stm->setNonBlock(true);
255 return obj;
256 }
257
readAndDetectPdu(w_stm_t stm,json_error_t * jerr)258 bool watchman_json_buffer::readAndDetectPdu(w_stm_t stm, json_error_t* jerr) {
259 enum w_pdu_type pdu;
260 // The client might send us different kinds of PDUs over the same connection,
261 // so reset the capabilities.
262 capabilities = 0;
263
264 shuntDown();
265 pdu = detectPdu();
266 if (pdu == need_data) {
267 if (!fillBuffer(stm)) {
268 if (errno != EAGAIN) {
269 snprintf(jerr->text, sizeof(jerr->text),
270 "fill_buffer: %s",
271 errno ? strerror(errno) : "EOF");
272 }
273 return false;
274 }
275 pdu = detectPdu();
276 }
277
278 if (pdu == is_bser_v2) {
279 // read capabilities (since we haven't increased rpos, first two bytes are
280 // still the header)
281 while (wpos - rpos < 2 + sizeof(capabilities)) {
282 if (!fillBuffer(stm)) {
283 if (errno != EAGAIN) {
284 snprintf(
285 jerr->text,
286 sizeof(jerr->text),
287 "fillBuffer: %s",
288 errno ? strerror(errno) : "EOF");
289 }
290 return false;
291 }
292 }
293
294 // Copy the capabilities over. BSER is system-endian so this is safe.
295 memcpy(&capabilities, buf + rpos + 2, sizeof(capabilities));
296 }
297
298 if (pdu == is_json_compact && stm == w_stm_stdin()) {
299 // Minor hack for the `-j` option for reading pretty printed
300 // json from stdin
301 pdu = is_json_pretty;
302 }
303
304 pdu_type = pdu;
305 return true;
306 }
307
output_bytes(const char * buf,int x)308 static bool output_bytes(const char *buf, int x)
309 {
310 int res;
311
312 while (x > 0) {
313 res = (int)fwrite(buf, 1, x, stdout);
314 if (res == 0) {
315 return false;
316 }
317 buf += res;
318 x -= res;
319 }
320 return true;
321 }
322
streamUntilNewLine(w_stm_t stm)323 bool watchman_json_buffer::streamUntilNewLine(w_stm_t stm) {
324 int x;
325 char* localBuf;
326 bool is_done = false;
327
328 while (true) {
329 localBuf = buf + rpos;
330 auto nl = (char*)memchr(localBuf, '\n', wpos - rpos);
331 if (nl) {
332 x = 1 + (int)(nl - localBuf);
333 is_done = true;
334 } else {
335 x = wpos - rpos;
336 }
337
338 if (!output_bytes(localBuf, x)) {
339 return false;
340 }
341 localBuf += x;
342 rpos += x;
343
344 if (is_done) {
345 break;
346 }
347
348 if (!fillBuffer(stm)) {
349 break;
350 }
351 }
352 return true;
353 }
354
streamN(w_stm_t stm,json_int_t len,json_error_t * jerr)355 bool watchman_json_buffer::streamN(
356 w_stm_t stm,
357 json_int_t len,
358 json_error_t* jerr) {
359 uint32_t total = 0;
360
361 if (!output_bytes(buf, rpos)) {
362 snprintf(
363 jerr->text,
364 sizeof(jerr->text),
365 "failed output headers bytes %d: %s\n",
366 rpos,
367 strerror(errno));
368 return false;
369 }
370 while (len > 0) {
371 uint32_t avail = wpos - rpos;
372 int r;
373
374 if (avail) {
375 if (!output_bytes(buf + rpos, avail)) {
376 snprintf(
377 jerr->text,
378 sizeof(jerr->text),
379 "output_bytes: avail=%d, failed %s\n",
380 avail,
381 strerror(errno));
382 return false;
383 }
384 rpos += avail;
385 len -= avail;
386
387 if (len == 0) {
388 return true;
389 }
390 }
391
392 avail = std::min((uint32_t)len, shuntDown());
393 r = stm->read(buf + wpos, avail);
394
395 if (r <= 0) {
396 snprintf(
397 jerr->text,
398 sizeof(jerr->text),
399 "read: len=%" PRIi64 " wanted %" PRIu32 " got %d %s\n",
400 (int64_t)len,
401 avail,
402 r,
403 strerror(errno));
404 return false;
405 }
406 wpos += r;
407 total += r;
408 }
409 return true;
410 }
411
streamPdu(w_stm_t stm,json_error_t * jerr)412 bool watchman_json_buffer::streamPdu(w_stm_t stm, json_error_t* jerr) {
413 uint32_t bser_version = 1;
414 json_int_t bser_capabilities;
415 json_int_t len;
416
417 switch (pdu_type) {
418 case is_json_compact:
419 case is_json_pretty:
420 return streamUntilNewLine(stm);
421 case is_bser:
422 case is_bser_v2:
423 {
424 if (pdu_type == is_bser_v2) {
425 bser_version = 2;
426 } else {
427 bser_version = 1;
428 }
429 rpos += 2;
430 if (!decodePduInfo(stm, bser_version, &len, &bser_capabilities, jerr)) {
431 return false;
432 }
433 return streamN(stm, len, jerr);
434 }
435 default:
436 w_log(W_LOG_FATAL, "not streaming for pdu type %d\n", pdu_type);
437 return false;
438 }
439 }
440
decodePdu(w_stm_t stm,json_error_t * jerr)441 json_ref watchman_json_buffer::decodePdu(w_stm_t stm, json_error_t* jerr) {
442 switch (pdu_type) {
443 case is_json_compact:
444 return readJsonPdu(stm, jerr);
445 case is_json_pretty:
446 return readJsonPrettyPdu(stm, jerr);
447 case is_bser_v2:
448 return readBserPdu(stm, 2, jerr);
449 default: // bser v1
450 return readBserPdu(stm, 1, jerr);
451 }
452 }
453
passThru(enum w_pdu_type output_pdu,uint32_t output_capabilities,w_jbuffer_t * output_pdu_buf,w_stm_t stm)454 bool watchman_json_buffer::passThru(
455 enum w_pdu_type output_pdu,
456 uint32_t output_capabilities,
457 w_jbuffer_t* output_pdu_buf,
458 w_stm_t stm) {
459 json_error_t jerr;
460 bool res;
461
462 stm->setNonBlock(false);
463 if (!readAndDetectPdu(stm, &jerr)) {
464 w_log(W_LOG_ERR, "failed to identify PDU: %s\n",
465 jerr.text);
466 return false;
467 }
468
469 if (pdu_type == output_pdu) {
470 // We can stream it through
471 if (!streamPdu(stm, &jerr)) {
472 w_log(W_LOG_ERR, "stream_pdu: %s\n", jerr.text);
473 return false;
474 }
475 return true;
476 }
477
478 auto j = decodePdu(stm, &jerr);
479
480 if (!j) {
481 w_log(W_LOG_ERR, "failed to parse response: %s\n",
482 jerr.text);
483 return false;
484 }
485
486 output_pdu_buf->clear();
487
488 res = output_pdu_buf->pduEncodeToStream(
489 output_pdu, output_capabilities, j, w_stm_stdout());
490
491 return res;
492 }
493
decodeNext(w_stm_t stm,json_error_t * jerr)494 json_ref watchman_json_buffer::decodeNext(w_stm_t stm, json_error_t* jerr) {
495 memset(jerr, 0, sizeof(*jerr));
496 if (!readAndDetectPdu(stm, jerr)) {
497 return nullptr;
498 }
499 return decodePdu(stm, jerr);
500 }
501
502 struct jbuffer_write_data {
503 w_stm_t stm;
504 w_jbuffer_t* jr;
505
flushjbuffer_write_data506 bool flush() {
507 int x;
508
509 while (jr->wpos - jr->rpos) {
510 x = stm->write(jr->buf + jr->rpos, jr->wpos - jr->rpos);
511
512 if (x <= 0) {
513 return false;
514 }
515
516 jr->rpos += x;
517 }
518
519 jr->clear();
520 return true;
521 }
522
writejbuffer_write_data523 static int write(const char* buffer, size_t size, void* ptr) {
524 auto data = (jbuffer_write_data*)ptr;
525 return data->write(buffer, size);
526 }
527
writejbuffer_write_data528 int write(const char* buffer, size_t size) {
529 while (size) {
530 // Accumulate in the buffer
531 int room = jr->allocd - jr->wpos;
532
533 // No room? send it over the wire
534 if (!room) {
535 if (!flush()) {
536 return -1;
537 }
538 room = jr->allocd - jr->wpos;
539 }
540
541 if ((int)size < room) {
542 room = (int)size;
543 }
544
545 // Stick it in the buffer
546 memcpy(jr->buf + jr->wpos, buffer, room);
547
548 buffer += room;
549 size -= room;
550 jr->wpos += room;
551 }
552
553 return 0;
554 }
555 };
556
bserEncodeToStream(uint32_t bser_version,uint32_t bser_capabilities,const json_ref & json,w_stm_t stm)557 bool watchman_json_buffer::bserEncodeToStream(
558 uint32_t bser_version,
559 uint32_t bser_capabilities,
560 const json_ref& json,
561 w_stm_t stm) {
562 struct jbuffer_write_data data = {stm, this};
563 int res;
564
565 res = w_bser_write_pdu(
566 bser_version, bser_capabilities, jbuffer_write_data::write, json, &data);
567
568 if (res != 0) {
569 return false;
570 }
571
572 return data.flush();
573 }
574
jsonEncodeToStream(const json_ref & json,w_stm_t stm,int flags)575 bool watchman_json_buffer::jsonEncodeToStream(
576 const json_ref& json,
577 w_stm_t stm,
578 int flags) {
579 struct jbuffer_write_data data = {stm, this};
580 int res;
581
582 res = json_dump_callback(json, jbuffer_write_data::write, &data, flags);
583
584 if (res != 0) {
585 return false;
586 }
587
588 if (data.write("\n", 1) != 0) {
589 return false;
590 }
591
592 return data.flush();
593 }
594
pduEncodeToStream(enum w_pdu_type pdu_type,uint32_t capabilities,const json_ref & json,w_stm_t stm)595 bool watchman_json_buffer::pduEncodeToStream(
596 enum w_pdu_type pdu_type,
597 uint32_t capabilities,
598 const json_ref& json,
599 w_stm_t stm) {
600 switch (pdu_type) {
601 case is_json_compact:
602 return jsonEncodeToStream(json, stm, JSON_COMPACT);
603 case is_json_pretty:
604 return jsonEncodeToStream(json, stm, JSON_INDENT(4));
605 case is_bser:
606 return bserEncodeToStream(1, capabilities, json, stm);
607 case is_bser_v2:
608 return bserEncodeToStream(2, capabilities, json, stm);
609 case need_data:
610 default:
611 return false;
612 }
613 }
614
615 /* vim:ts=2:sw=2:et:
616 */
617