1 /* Copyright 2012-present Facebook, Inc.
2  * Licensed under the Apache License, Version 2.0 */
3 
4 #include "watchman.h"
5 
w_json_buffer_init(w_jbuffer_t * jr)6 bool w_json_buffer_init(w_jbuffer_t *jr)
7 {
8   memset(jr, 0, sizeof(*jr));
9 
10   jr->allocd = WATCHMAN_IO_BUF_SIZE;
11   jr->buf = malloc(jr->allocd);
12 
13   if (!jr->buf) {
14     return false;
15   }
16 
17   return true;
18 }
19 
w_json_buffer_reset(w_jbuffer_t * jr)20 void w_json_buffer_reset(w_jbuffer_t *jr)
21 {
22   jr->wpos = 0;
23   jr->rpos = 0;
24 }
25 
w_json_buffer_free(w_jbuffer_t * jr)26 void w_json_buffer_free(w_jbuffer_t *jr)
27 {
28   free(jr->buf);
29   memset(jr, 0, sizeof(*jr));
30 }
31 
32 // Shunt down, return available size
shunt_down(w_jbuffer_t * jr)33 static inline uint32_t shunt_down(w_jbuffer_t *jr)
34 {
35   if (jr->rpos && jr->rpos == jr->wpos) {
36     jr->rpos = 0;
37     jr->wpos = 0;
38   }
39   if (jr->rpos && jr->rpos < jr->wpos) {
40     memmove(jr->buf, jr->buf + jr->rpos, jr->wpos - jr->rpos);
41     jr->wpos -= jr->rpos;
42     jr->rpos = 0;
43 
44   }
45   return jr->allocd - jr->wpos;
46 }
47 
fill_buffer(w_jbuffer_t * jr,w_stm_t stm)48 static bool fill_buffer(w_jbuffer_t *jr, w_stm_t stm)
49 {
50   uint32_t avail;
51   int r;
52 
53   avail = shunt_down(jr);
54 
55   // Get some more space if we need it
56   if (avail == 0) {
57     char *buf = realloc(jr->buf, jr->allocd * 2);
58 
59     if (!buf) {
60       return false;
61     }
62 
63     jr->buf = buf;
64     jr->allocd *= 2;
65 
66     avail = jr->allocd - jr->wpos;
67   }
68 
69   errno = 0;
70   r = w_stm_read(stm, jr->buf + jr->wpos, avail);
71   if (r <= 0) {
72     return false;
73   }
74 
75   jr->wpos += r;
76 
77   return true;
78 }
79 
detect_pdu(w_jbuffer_t * jr)80 static inline enum w_pdu_type detect_pdu(w_jbuffer_t *jr)
81 {
82   if (jr->wpos - jr->rpos < 2) {
83     return need_data;
84   }
85   if (memcmp(jr->buf + jr->rpos, BSER_MAGIC, 2) == 0) {
86     return is_bser;
87   }
88   return is_json_compact;
89 }
90 
read_json_pretty_pdu(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)91 static json_t *read_json_pretty_pdu(w_jbuffer_t *jr, w_stm_t stm, json_error_t *jerr)
92 {
93   char *nl;
94   int r;
95   json_t *res;
96 
97   // Assume newline is at the end of what we have
98   nl = jr->buf + jr->wpos;
99   r = (int)(nl - (jr->buf + jr->rpos));
100   res = json_loadb(jr->buf + jr->rpos, r, 0, jerr);
101   if (!res) {
102     // Maybe we can fill more data into the buffer and retry?
103     if (!fill_buffer(jr, stm)) {
104       // No, then error is terminal
105       return NULL;
106     }
107     // Recompute end of buffer
108     nl = jr->buf + jr->wpos;
109     r = (int)(nl - (jr->buf + jr->rpos));
110     // And try parsing this
111     res = json_loadb(jr->buf + jr->rpos, r, 0, jerr);
112   }
113 
114   // update read pos to look beyond this point
115   jr->rpos += r + 1;
116 
117   return res;
118 }
119 
read_json_pdu(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)120 static json_t *read_json_pdu(w_jbuffer_t *jr, w_stm_t stm, json_error_t *jerr)
121 {
122   char *nl;
123   int r;
124   json_t *res;
125 
126   /* look for a newline; that indicates the end of
127    * a json packet */
128   nl = memchr(jr->buf + jr->rpos, '\n', jr->wpos - jr->rpos);
129 
130   // If we don't have a newline, we need to fill the
131   // buffer
132   while (!nl) {
133     if (!fill_buffer(jr, stm)) {
134       if (errno == 0 && stm == w_stm_stdin()) {
135         // Ugly-ish hack to support the -j CLI option.  This allows
136         // us to consume a JSON input that doesn't end with a newline.
137         // We only allow this on EOF when reading from stdin
138         nl = jr->buf + jr->wpos;
139         break;
140       }
141       return NULL;
142     }
143     nl = memchr(jr->buf + jr->rpos, '\n', jr->wpos - jr->rpos);
144   }
145 
146   // buflen
147   r = (int)(nl - (jr->buf + jr->rpos));
148   res = json_loadb(jr->buf + jr->rpos, r, 0, jerr);
149 
150   // update read pos to look beyond this point
151   jr->rpos += r + 1;
152 
153   return res;
154 }
155 
w_bser_decode_pdu_len(w_jbuffer_t * jr,w_stm_t stm,json_int_t * len,json_error_t * jerr)156 bool w_bser_decode_pdu_len(w_jbuffer_t *jr, w_stm_t stm,
157     json_int_t *len, json_error_t *jerr)
158 {
159   json_int_t needed;
160 
161   while (!bunser_int(jr->buf + jr->rpos, jr->wpos - jr->rpos,
162         &needed, len)) {
163     if (needed == -1) {
164       snprintf(jerr->text, sizeof(jerr->text),
165           "failed to read PDU size");
166       return false;
167     }
168     if (!fill_buffer(jr, stm)) {
169       snprintf(jerr->text, sizeof(jerr->text),
170           "unable to fill buffer");
171       return false;
172     }
173   }
174   jr->rpos += (uint32_t)needed;
175 
176   return true;
177 }
178 
read_bser_pdu(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)179 static json_t *read_bser_pdu(w_jbuffer_t *jr, w_stm_t stm, json_error_t *jerr)
180 {
181   json_int_t needed;
182   json_int_t val;
183   uint32_t ideal;
184   json_int_t need;
185   int r;
186   json_t *obj;
187 
188   jr->rpos += 2;
189 
190   // We don't handle EAGAIN cleanly in here
191   w_stm_set_nonblock(stm, false);
192   if (!w_bser_decode_pdu_len(jr, stm, &val, jerr)) {
193     return NULL;
194   }
195 
196   // val tells us exactly how much storage we need for this PDU
197   need = val - (jr->allocd - jr->wpos);
198   if (need > 0) {
199     ideal = jr->allocd;
200     while (ideal < (uint32_t)need) {
201       ideal *= 2;
202     }
203     if (ideal > jr->allocd) {
204       char *buf = realloc(jr->buf, ideal);
205 
206       if (!buf) {
207         snprintf(jerr->text, sizeof(jerr->text),
208             "out of memory while allocating %" PRIu32 " bytes",
209             ideal);
210         return NULL;
211       }
212 
213       jr->buf = buf;
214       jr->allocd = ideal;
215     }
216   }
217 
218   // We have enough room for the whole thing, let's read it in
219   while ((jr->wpos - jr->rpos) < val) {
220     r = w_stm_read(stm, jr->buf + jr->wpos, jr->allocd - jr->wpos);
221     if (r <= 0) {
222       snprintf(jerr->text, sizeof(jerr->text),
223           "error reading PDU: %s",
224           strerror(errno));
225       return NULL;
226     }
227     jr->wpos += r;
228   }
229 
230   obj = bunser(jr->buf + jr->rpos, jr->buf + jr->wpos, &needed, jerr);
231 
232   // Ensure that we move the read position to the wpos; we consumed it all
233   jr->rpos = jr->wpos;
234 
235   w_stm_set_nonblock(stm, true);
236   return obj;
237 }
238 
read_and_detect_pdu(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)239 static bool read_and_detect_pdu(w_jbuffer_t *jr, w_stm_t stm,
240     json_error_t *jerr)
241 {
242   enum w_pdu_type pdu;
243 
244   shunt_down(jr);
245   pdu = detect_pdu(jr);
246   if (pdu == need_data) {
247     if (!fill_buffer(jr, stm)) {
248       if (errno != EAGAIN) {
249         snprintf(jerr->text, sizeof(jerr->text),
250           "fill_buffer: %s",
251           errno ? strerror(errno) : "EOF");
252       }
253       return false;
254     }
255     pdu = detect_pdu(jr);
256   }
257 
258   if (pdu == is_json_compact && stm == w_stm_stdin()) {
259     // Minor hack for the `-j` option for reading pretty printed
260     // json from stdin
261     pdu = is_json_pretty;
262   }
263 
264   jr->pdu_type = pdu;
265   return true;
266 }
267 
output_bytes(const char * buf,int x)268 static bool output_bytes(const char *buf, int x)
269 {
270   int res;
271 
272   while (x > 0) {
273     res = (int)fwrite(buf, 1, x, stdout);
274     if (res == 0) {
275       return false;
276     }
277     buf += res;
278     x -= res;
279   }
280   return true;
281 }
282 
stream_until_newline(w_jbuffer_t * reader,w_stm_t stm)283 static bool stream_until_newline(w_jbuffer_t *reader, w_stm_t stm)
284 {
285   int x;
286   char *buf, *nl;
287   bool is_done = false;
288 
289   while (true) {
290     buf = reader->buf + reader->rpos;
291     nl = memchr(buf, '\n', reader->wpos - reader->rpos);
292     if (nl) {
293       x = 1 + (int)(nl - buf);
294       is_done = true;
295     } else {
296       x = reader->wpos - reader->rpos;
297     }
298 
299     if (!output_bytes(buf, x)) {
300       return false;
301     }
302     buf += x;
303     reader->rpos += x;
304 
305     if (is_done) {
306       break;
307     }
308 
309     if (!fill_buffer(reader, stm)) {
310       break;
311     }
312   }
313   return true;
314 }
315 
stream_n_bytes(w_jbuffer_t * jr,w_stm_t stm,json_int_t len,json_error_t * jerr)316 static bool stream_n_bytes(w_jbuffer_t *jr, w_stm_t stm, json_int_t len,
317     json_error_t *jerr)
318 {
319   uint32_t total = 0;
320 
321   if (!output_bytes(jr->buf, jr->rpos)) {
322     snprintf(jerr->text, sizeof(jerr->text),
323         "failed output headers bytes %d: %s\n",
324         jr->rpos, strerror(errno));
325     return false;
326   }
327   while (len > 0) {
328     uint32_t avail = jr->wpos - jr->rpos;
329     int r;
330 
331     if (avail) {
332       if (!output_bytes(jr->buf + jr->rpos, avail)) {
333         snprintf(jerr->text, sizeof(jerr->text),
334             "output_bytes: avail=%d, failed %s\n",
335             avail, strerror(errno));
336         return false;
337       }
338       jr->rpos += avail;
339       len -= avail;
340 
341       if (len == 0) {
342         return true;
343       }
344     }
345 
346     avail = MIN((uint32_t)len, shunt_down(jr));
347     r = w_stm_read(stm, jr->buf + jr->wpos, avail);
348 
349     if (r <= 0) {
350       snprintf(jerr->text, sizeof(jerr->text),
351         "read: len=%"PRIi64" wanted %"PRIu32" got %d %s\n",
352         (int64_t)len, avail,
353         r, strerror(errno));
354       return false;
355     }
356     jr->wpos += r;
357     total += r;
358   }
359   return true;
360 }
361 
stream_pdu(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)362 static bool stream_pdu(w_jbuffer_t *jr, w_stm_t stm, json_error_t *jerr)
363 {
364   switch (jr->pdu_type) {
365     case is_json_compact:
366     case is_json_pretty:
367       return stream_until_newline(jr, stm);
368     case is_bser:
369       {
370         json_int_t len;
371         jr->rpos += 2;
372         if (!w_bser_decode_pdu_len(jr, stm, &len, jerr)) {
373           return false;
374         }
375         return stream_n_bytes(jr, stm, len, jerr);
376       }
377     default:
378       w_log(W_LOG_FATAL, "not streaming for pdu type %d\n", jr->pdu_type);
379       return false;
380   }
381 }
382 
read_pdu_into_json(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)383 static json_t *read_pdu_into_json(w_jbuffer_t *jr, w_stm_t stm,
384     json_error_t *jerr)
385 {
386   switch (jr->pdu_type) {
387     case is_json_compact:
388       return read_json_pdu(jr, stm, jerr);
389     case is_json_pretty:
390       return read_json_pretty_pdu(jr, stm, jerr);
391     default:
392       return read_bser_pdu(jr, stm, jerr);
393   }
394 }
395 
w_json_buffer_passthru(w_jbuffer_t * jr,enum w_pdu_type output_pdu,w_jbuffer_t * output_pdu_buf,w_stm_t stm)396 bool w_json_buffer_passthru(w_jbuffer_t *jr,
397     enum w_pdu_type output_pdu,
398     w_jbuffer_t *output_pdu_buf,
399     w_stm_t stm)
400 {
401   json_t *j;
402   json_error_t jerr;
403   bool res;
404 
405   if (!read_and_detect_pdu(jr, stm, &jerr)) {
406     w_log(W_LOG_ERR, "failed to identify PDU: %s\n",
407         jerr.text);
408     return false;
409   }
410 
411   if (jr->pdu_type == output_pdu) {
412     // We can stream it through
413     if (!stream_pdu(jr, stm, &jerr)) {
414       w_log(W_LOG_ERR, "stream_pdu: %s\n", jerr.text);
415       return false;
416     }
417     return true;
418   }
419 
420   j = read_pdu_into_json(jr, stm, &jerr);
421 
422   if (!j) {
423     w_log(W_LOG_ERR, "failed to parse response: %s\n",
424         jerr.text);
425     return false;
426   }
427 
428   w_json_buffer_reset(output_pdu_buf);
429 
430   res = w_ser_write_pdu(output_pdu, output_pdu_buf, w_stm_stdout(), j);
431 
432   json_decref(j);
433   return res;
434 }
435 
w_json_buffer_next(w_jbuffer_t * jr,w_stm_t stm,json_error_t * jerr)436 json_t *w_json_buffer_next(w_jbuffer_t *jr, w_stm_t stm, json_error_t *jerr)
437 {
438   memset(jerr, 0, sizeof(*jerr));
439   if (!read_and_detect_pdu(jr, stm, jerr)) {
440     return NULL;
441   }
442   return read_pdu_into_json(jr, stm, jerr);
443 }
444 
445 struct jbuffer_write_data {
446   w_stm_t stm;
447   w_jbuffer_t *jr;
448 };
449 
jbuffer_flush(struct jbuffer_write_data * data)450 static bool jbuffer_flush(struct jbuffer_write_data *data)
451 {
452   int x;
453 
454   while (data->jr->wpos - data->jr->rpos) {
455     x = w_stm_write(data->stm, data->jr->buf + data->jr->rpos,
456         data->jr->wpos - data->jr->rpos);
457 
458     if (x <= 0) {
459       return false;
460     }
461 
462     data->jr->rpos += x;
463   }
464 
465   data->jr->rpos = data->jr->wpos = 0;
466   return true;
467 }
468 
jbuffer_write(const char * buffer,size_t size,void * ptr)469 static int jbuffer_write(const char *buffer, size_t size, void *ptr)
470 {
471   struct jbuffer_write_data *data = ptr;
472 
473   while (size) {
474     // Accumulate in the buffer
475     int room = data->jr->allocd - data->jr->wpos;
476 
477     // No room? send it over the wire
478     if (!room) {
479       if (!jbuffer_flush(data)) {
480         return -1;
481       }
482       room = data->jr->allocd - data->jr->wpos;
483     }
484 
485     if ((int)size < room) {
486       room = (int)size;
487     }
488 
489     // Stick it in the buffer
490     memcpy(data->jr->buf + data->jr->wpos,
491         buffer, room);
492 
493     buffer += room;
494     size -= room;
495     data->jr->wpos += room;
496   }
497 
498   return 0;
499 }
500 
w_json_buffer_write_bser(w_jbuffer_t * jr,w_stm_t stm,json_t * json)501 bool w_json_buffer_write_bser(w_jbuffer_t *jr, w_stm_t stm, json_t *json)
502 {
503   struct jbuffer_write_data data = { stm, jr };
504   int res;
505 
506   res = w_bser_write_pdu(json, jbuffer_write, &data);
507 
508   if (res != 0) {
509     return false;
510   }
511 
512   return jbuffer_flush(&data);
513 }
514 
w_json_buffer_write(w_jbuffer_t * jr,w_stm_t stm,json_t * json,int flags)515 bool w_json_buffer_write(w_jbuffer_t *jr, w_stm_t stm, json_t *json, int flags)
516 {
517   struct jbuffer_write_data data = { stm, jr };
518   int res;
519 
520   res = json_dump_callback(json, jbuffer_write, &data, flags);
521 
522   if (res != 0) {
523     return false;
524   }
525 
526   if (jbuffer_write("\n", 1, &data) != 0) {
527     return false;
528   }
529 
530   return jbuffer_flush(&data);
531 }
532 
w_ser_write_pdu(enum w_pdu_type pdu_type,w_jbuffer_t * jr,w_stm_t stm,json_t * json)533 bool w_ser_write_pdu(enum w_pdu_type pdu_type,
534     w_jbuffer_t *jr, w_stm_t stm, json_t *json)
535 {
536   switch (pdu_type) {
537     case is_json_compact:
538       return w_json_buffer_write(jr, stm, json, JSON_COMPACT);
539     case is_json_pretty:
540       return w_json_buffer_write(jr, stm, json, JSON_INDENT(4));
541     case is_bser:
542       return w_json_buffer_write_bser(jr, stm, json);
543     case need_data:
544     default:
545       return false;
546   }
547 }
548 
549 /* vim:ts=2:sw=2:et:
550  */
551