1 /* Copyright 2012-present Facebook, Inc.
2  * Licensed under the Apache License, Version 2.0 */
3 
4 #include "watchman.h"
5 
6 W_CAP_REG("bser-v2")
watchman_json_buffer()7 watchman_json_buffer::watchman_json_buffer()
8     : buf((char*)malloc(WATCHMAN_IO_BUF_SIZE)),
9       allocd(WATCHMAN_IO_BUF_SIZE),
10       rpos(0),
11       wpos(0),
12       pdu_type(need_data),
13       capabilities(0) {
14   if (!buf) {
15     throw std::bad_alloc();
16   }
17 }
18 
clear()19 void watchman_json_buffer::clear() {
20   wpos = 0;
21   rpos = 0;
22 }
23 
~watchman_json_buffer()24 watchman_json_buffer::~watchman_json_buffer() {
25   free(buf);
26 }
27 
28 // Shunt down, return available size
shuntDown()29 uint32_t watchman_json_buffer::shuntDown() {
30   if (rpos && rpos == wpos) {
31     rpos = 0;
32     wpos = 0;
33   }
34   if (rpos && rpos < wpos) {
35     memmove(buf, buf + rpos, wpos - rpos);
36     wpos -= rpos;
37     rpos = 0;
38   }
39   return allocd - wpos;
40 }
41 
fillBuffer(w_stm_t stm)42 bool watchman_json_buffer::fillBuffer(w_stm_t stm) {
43   uint32_t avail;
44   int r;
45 
46   avail = shuntDown();
47 
48   // Get some more space if we need it
49   if (avail == 0) {
50     char* newBuf = (char*)realloc(buf, allocd * 2);
51 
52     if (!newBuf) {
53       return false;
54     }
55 
56     buf = newBuf;
57     allocd *= 2;
58 
59     avail = allocd - wpos;
60   }
61 
62   errno = 0;
63   r = stm->read(buf + wpos, avail);
64   if (r <= 0) {
65     return false;
66   }
67 
68   wpos += r;
69 
70   return true;
71 }
72 
detectPdu()73 inline enum w_pdu_type watchman_json_buffer::detectPdu() {
74   if (wpos - rpos < 2) {
75     return need_data;
76   }
77   if (memcmp(buf + rpos, BSER_MAGIC, 2) == 0) {
78     return is_bser;
79   }
80   if (memcmp(buf + rpos, BSER_V2_MAGIC, 2) == 0) {
81     return is_bser_v2;
82   }
83   return is_json_compact;
84 }
85 
readJsonPrettyPdu(w_stm_t stm,json_error_t * jerr)86 json_ref watchman_json_buffer::readJsonPrettyPdu(
87     w_stm_t stm,
88     json_error_t* jerr) {
89   char *nl;
90   int r;
91   json_ref res;
92 
93   // Assume newline is at the end of what we have
94   nl = buf + wpos;
95   r = (int)(nl - (buf + rpos));
96   res = json_loadb(buf + rpos, r, 0, jerr);
97   while (!res) {
98     // Maybe we can fill more data into the buffer and retry?
99     if (!fillBuffer(stm)) {
100       // No, then error is terminal
101       return nullptr;
102     }
103     // Recompute end of buffer
104     nl = buf + wpos;
105     r = (int)(nl - (buf + rpos));
106     // And try parsing this
107     res = json_loadb(buf + rpos, r, 0, jerr);
108   }
109 
110   // update read pos to look beyond this point
111   rpos += r + 1;
112 
113   return res;
114 }
115 
readJsonPdu(w_stm_t stm,json_error_t * jerr)116 json_ref watchman_json_buffer::readJsonPdu(w_stm_t stm, json_error_t* jerr) {
117   int r;
118 
119   /* look for a newline; that indicates the end of
120    * a json packet */
121   auto nl = (char*)memchr(buf + rpos, '\n', wpos - rpos);
122 
123   // If we don't have a newline, we need to fill the
124   // buffer
125   while (!nl) {
126     if (!fillBuffer(stm)) {
127       if (errno == 0 && stm == w_stm_stdin()) {
128         // Ugly-ish hack to support the -j CLI option.  This allows
129         // us to consume a JSON input that doesn't end with a newline.
130         // We only allow this on EOF when reading from stdin
131         nl = buf + wpos;
132         break;
133       }
134       return nullptr;
135     }
136     nl = (char*)memchr(buf + rpos, '\n', wpos - rpos);
137   }
138 
139   // buflen
140   r = (int)(nl - (buf + rpos));
141   auto res = json_loadb(buf + rpos, r, 0, jerr);
142 
143   // update read pos to look beyond this point
144   rpos += r + 1;
145 
146   return res;
147 }
148 
decodePduInfo(w_stm_t stm,uint32_t bser_version,json_int_t * len,json_int_t * bser_capabilities,json_error_t * jerr)149 bool watchman_json_buffer::decodePduInfo(
150     w_stm_t stm,
151     uint32_t bser_version,
152     json_int_t* len,
153     json_int_t* bser_capabilities,
154     json_error_t* jerr) {
155   json_int_t needed;
156   if (bser_version == 2) {
157     uint32_t capabilities;
158     while (wpos - rpos < sizeof(capabilities)) {
159       if (!fillBuffer(stm)) {
160         snprintf(jerr->text, sizeof(jerr->text),
161             "unable to fill buffer");
162         return false;
163       }
164     }
165     // json_int_t is architecture-dependent, so go through the uint32_t for
166     // safety.
167     memcpy(&capabilities, buf + rpos, sizeof(capabilities));
168     *bser_capabilities = capabilities;
169     rpos += sizeof(capabilities);
170   }
171 
172   while (!bunser_int(buf + rpos, wpos - rpos, &needed, len)) {
173     if (needed == -1) {
174       snprintf(jerr->text, sizeof(jerr->text),
175           "failed to read PDU size");
176       return false;
177     }
178     if (!fillBuffer(stm)) {
179       snprintf(jerr->text, sizeof(jerr->text),
180           "unable to fill buffer");
181       return false;
182     }
183   }
184   rpos += (uint32_t)needed;
185 
186   return true;
187 }
188 
readBserPdu(w_stm_t stm,uint32_t bser_version,json_error_t * jerr)189 json_ref watchman_json_buffer::readBserPdu(
190     w_stm_t stm,
191     uint32_t bser_version,
192     json_error_t* jerr) {
193   json_int_t needed;
194   json_int_t val;
195   json_int_t bser_capabilities;
196   uint32_t ideal;
197   int r;
198   json_ref obj;
199 
200   rpos += 2;
201 
202   // We don't handle EAGAIN cleanly in here
203   stm->setNonBlock(false);
204   if (!decodePduInfo(stm, bser_version, &val, &bser_capabilities, jerr)) {
205     return nullptr;
206   }
207 
208   // val tells us exactly how much storage we need for this PDU
209   if (val > allocd - wpos) {
210     ideal = allocd;
211     while ((ideal - wpos) < (uint32_t)val) {
212       ideal *= 2;
213     }
214     if (ideal > allocd) {
215       auto newBuf = (char*)realloc(buf, ideal);
216 
217       if (!newBuf) {
218         snprintf(jerr->text, sizeof(jerr->text),
219             "out of memory while allocating %" PRIu32 " bytes",
220             ideal);
221         return nullptr;
222       }
223 
224       buf = newBuf;
225       allocd = ideal;
226     }
227   }
228 
229   // We have enough room for the whole thing, let's read it in
230   while ((wpos - rpos) < val) {
231     r = stm->read(buf + wpos, allocd - wpos);
232     if (r <= 0) {
233       jerr->position = wpos - rpos;
234       snprintf(
235           jerr->text,
236           sizeof(jerr->text),
237           "error reading %" PRIu32 " bytes val=%" PRIu64
238           " wpos=%" PRIu32 " rpos=%" PRIu32 " for PDU: %s",
239           uint32_t(allocd - wpos),
240           int64_t(val),
241           wpos,
242           rpos,
243           strerror(errno));
244       return nullptr;
245     }
246     wpos += r;
247   }
248 
249   obj = bunser(buf + rpos, buf + wpos, &needed, jerr);
250 
251   // Ensure that we move the read position to the wpos; we consumed it all
252   rpos = wpos;
253 
254   stm->setNonBlock(true);
255   return obj;
256 }
257 
readAndDetectPdu(w_stm_t stm,json_error_t * jerr)258 bool watchman_json_buffer::readAndDetectPdu(w_stm_t stm, json_error_t* jerr) {
259   enum w_pdu_type pdu;
260   // The client might send us different kinds of PDUs over the same connection,
261   // so reset the capabilities.
262   capabilities = 0;
263 
264   shuntDown();
265   pdu = detectPdu();
266   if (pdu == need_data) {
267     if (!fillBuffer(stm)) {
268       if (errno != EAGAIN) {
269         snprintf(jerr->text, sizeof(jerr->text),
270           "fill_buffer: %s",
271           errno ? strerror(errno) : "EOF");
272       }
273       return false;
274     }
275     pdu = detectPdu();
276   }
277 
278   if (pdu == is_bser_v2) {
279     // read capabilities (since we haven't increased rpos, first two bytes are
280     // still the header)
281     while (wpos - rpos < 2 + sizeof(capabilities)) {
282       if (!fillBuffer(stm)) {
283         if (errno != EAGAIN) {
284           snprintf(
285               jerr->text,
286               sizeof(jerr->text),
287               "fillBuffer: %s",
288               errno ? strerror(errno) : "EOF");
289         }
290         return false;
291       }
292     }
293 
294     // Copy the capabilities over. BSER is system-endian so this is safe.
295     memcpy(&capabilities, buf + rpos + 2, sizeof(capabilities));
296   }
297 
298   if (pdu == is_json_compact && stm == w_stm_stdin()) {
299     // Minor hack for the `-j` option for reading pretty printed
300     // json from stdin
301     pdu = is_json_pretty;
302   }
303 
304   pdu_type = pdu;
305   return true;
306 }
307 
output_bytes(const char * buf,int x)308 static bool output_bytes(const char *buf, int x)
309 {
310   int res;
311 
312   while (x > 0) {
313     res = (int)fwrite(buf, 1, x, stdout);
314     if (res == 0) {
315       return false;
316     }
317     buf += res;
318     x -= res;
319   }
320   return true;
321 }
322 
streamUntilNewLine(w_stm_t stm)323 bool watchman_json_buffer::streamUntilNewLine(w_stm_t stm) {
324   int x;
325   char* localBuf;
326   bool is_done = false;
327 
328   while (true) {
329     localBuf = buf + rpos;
330     auto nl = (char*)memchr(localBuf, '\n', wpos - rpos);
331     if (nl) {
332       x = 1 + (int)(nl - localBuf);
333       is_done = true;
334     } else {
335       x = wpos - rpos;
336     }
337 
338     if (!output_bytes(localBuf, x)) {
339       return false;
340     }
341     localBuf += x;
342     rpos += x;
343 
344     if (is_done) {
345       break;
346     }
347 
348     if (!fillBuffer(stm)) {
349       break;
350     }
351   }
352   return true;
353 }
354 
streamN(w_stm_t stm,json_int_t len,json_error_t * jerr)355 bool watchman_json_buffer::streamN(
356     w_stm_t stm,
357     json_int_t len,
358     json_error_t* jerr) {
359   uint32_t total = 0;
360 
361   if (!output_bytes(buf, rpos)) {
362     snprintf(
363         jerr->text,
364         sizeof(jerr->text),
365         "failed output headers bytes %d: %s\n",
366         rpos,
367         strerror(errno));
368     return false;
369   }
370   while (len > 0) {
371     uint32_t avail = wpos - rpos;
372     int r;
373 
374     if (avail) {
375       if (!output_bytes(buf + rpos, avail)) {
376         snprintf(
377             jerr->text,
378             sizeof(jerr->text),
379             "output_bytes: avail=%d, failed %s\n",
380             avail,
381             strerror(errno));
382         return false;
383       }
384       rpos += avail;
385       len -= avail;
386 
387       if (len == 0) {
388         return true;
389       }
390     }
391 
392     avail = std::min((uint32_t)len, shuntDown());
393     r = stm->read(buf + wpos, avail);
394 
395     if (r <= 0) {
396       snprintf(
397           jerr->text,
398           sizeof(jerr->text),
399           "read: len=%" PRIi64 " wanted %" PRIu32 " got %d %s\n",
400           (int64_t)len,
401           avail,
402           r,
403           strerror(errno));
404       return false;
405     }
406     wpos += r;
407     total += r;
408   }
409   return true;
410 }
411 
streamPdu(w_stm_t stm,json_error_t * jerr)412 bool watchman_json_buffer::streamPdu(w_stm_t stm, json_error_t* jerr) {
413   uint32_t bser_version = 1;
414   json_int_t bser_capabilities;
415   json_int_t len;
416 
417   switch (pdu_type) {
418     case is_json_compact:
419     case is_json_pretty:
420       return streamUntilNewLine(stm);
421     case is_bser:
422     case is_bser_v2:
423       {
424       if (pdu_type == is_bser_v2) {
425         bser_version = 2;
426         } else {
427           bser_version = 1;
428         }
429         rpos += 2;
430         if (!decodePduInfo(stm, bser_version, &len, &bser_capabilities, jerr)) {
431           return false;
432         }
433         return streamN(stm, len, jerr);
434       }
435     default:
436       w_log(W_LOG_FATAL, "not streaming for pdu type %d\n", pdu_type);
437       return false;
438   }
439 }
440 
decodePdu(w_stm_t stm,json_error_t * jerr)441 json_ref watchman_json_buffer::decodePdu(w_stm_t stm, json_error_t* jerr) {
442   switch (pdu_type) {
443     case is_json_compact:
444       return readJsonPdu(stm, jerr);
445     case is_json_pretty:
446       return readJsonPrettyPdu(stm, jerr);
447     case is_bser_v2:
448       return readBserPdu(stm, 2, jerr);
449     default: // bser v1
450       return readBserPdu(stm, 1, jerr);
451   }
452 }
453 
passThru(enum w_pdu_type output_pdu,uint32_t output_capabilities,w_jbuffer_t * output_pdu_buf,w_stm_t stm)454 bool watchman_json_buffer::passThru(
455     enum w_pdu_type output_pdu,
456     uint32_t output_capabilities,
457     w_jbuffer_t* output_pdu_buf,
458     w_stm_t stm) {
459   json_error_t jerr;
460   bool res;
461 
462   stm->setNonBlock(false);
463   if (!readAndDetectPdu(stm, &jerr)) {
464     w_log(W_LOG_ERR, "failed to identify PDU: %s\n",
465         jerr.text);
466     return false;
467   }
468 
469   if (pdu_type == output_pdu) {
470     // We can stream it through
471     if (!streamPdu(stm, &jerr)) {
472       w_log(W_LOG_ERR, "stream_pdu: %s\n", jerr.text);
473       return false;
474     }
475     return true;
476   }
477 
478   auto j = decodePdu(stm, &jerr);
479 
480   if (!j) {
481     w_log(W_LOG_ERR, "failed to parse response: %s\n",
482         jerr.text);
483     return false;
484   }
485 
486   output_pdu_buf->clear();
487 
488   res = output_pdu_buf->pduEncodeToStream(
489       output_pdu, output_capabilities, j, w_stm_stdout());
490 
491   return res;
492 }
493 
decodeNext(w_stm_t stm,json_error_t * jerr)494 json_ref watchman_json_buffer::decodeNext(w_stm_t stm, json_error_t* jerr) {
495   memset(jerr, 0, sizeof(*jerr));
496   if (!readAndDetectPdu(stm, jerr)) {
497     return nullptr;
498   }
499   return decodePdu(stm, jerr);
500 }
501 
502 struct jbuffer_write_data {
503   w_stm_t stm;
504   w_jbuffer_t* jr;
505 
flushjbuffer_write_data506   bool flush() {
507     int x;
508 
509     while (jr->wpos - jr->rpos) {
510       x = stm->write(jr->buf + jr->rpos, jr->wpos - jr->rpos);
511 
512       if (x <= 0) {
513         return false;
514       }
515 
516       jr->rpos += x;
517     }
518 
519     jr->clear();
520     return true;
521   }
522 
writejbuffer_write_data523   static int write(const char* buffer, size_t size, void* ptr) {
524     auto data = (jbuffer_write_data*)ptr;
525     return data->write(buffer, size);
526   }
527 
writejbuffer_write_data528   int write(const char* buffer, size_t size) {
529     while (size) {
530       // Accumulate in the buffer
531       int room = jr->allocd - jr->wpos;
532 
533       // No room? send it over the wire
534       if (!room) {
535         if (!flush()) {
536           return -1;
537         }
538         room = jr->allocd - jr->wpos;
539       }
540 
541       if ((int)size < room) {
542         room = (int)size;
543       }
544 
545       // Stick it in the buffer
546       memcpy(jr->buf + jr->wpos, buffer, room);
547 
548       buffer += room;
549       size -= room;
550       jr->wpos += room;
551     }
552 
553     return 0;
554   }
555 };
556 
bserEncodeToStream(uint32_t bser_version,uint32_t bser_capabilities,const json_ref & json,w_stm_t stm)557 bool watchman_json_buffer::bserEncodeToStream(
558     uint32_t bser_version,
559     uint32_t bser_capabilities,
560     const json_ref& json,
561     w_stm_t stm) {
562   struct jbuffer_write_data data = {stm, this};
563   int res;
564 
565   res = w_bser_write_pdu(
566       bser_version, bser_capabilities, jbuffer_write_data::write, json, &data);
567 
568   if (res != 0) {
569     return false;
570   }
571 
572   return data.flush();
573 }
574 
jsonEncodeToStream(const json_ref & json,w_stm_t stm,int flags)575 bool watchman_json_buffer::jsonEncodeToStream(
576     const json_ref& json,
577     w_stm_t stm,
578     int flags) {
579   struct jbuffer_write_data data = {stm, this};
580   int res;
581 
582   res = json_dump_callback(json, jbuffer_write_data::write, &data, flags);
583 
584   if (res != 0) {
585     return false;
586   }
587 
588   if (data.write("\n", 1) != 0) {
589     return false;
590   }
591 
592   return data.flush();
593 }
594 
pduEncodeToStream(enum w_pdu_type pdu_type,uint32_t capabilities,const json_ref & json,w_stm_t stm)595 bool watchman_json_buffer::pduEncodeToStream(
596     enum w_pdu_type pdu_type,
597     uint32_t capabilities,
598     const json_ref& json,
599     w_stm_t stm) {
600   switch (pdu_type) {
601     case is_json_compact:
602       return jsonEncodeToStream(json, stm, JSON_COMPACT);
603     case is_json_pretty:
604       return jsonEncodeToStream(json, stm, JSON_INDENT(4));
605     case is_bser:
606       return bserEncodeToStream(1, capabilities, json, stm);
607     case is_bser_v2:
608       return bserEncodeToStream(2, capabilities, json, stm);
609     case need_data:
610     default:
611       return false;
612   }
613 }
614 
615 /* vim:ts=2:sw=2:et:
616  */
617