1 /*
2 Copyright (c) 2014, Facebook, Inc.
3 All rights reserved.
4 
header(item)5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7 
8  * Redistributions of source code must retain the above copyright notice,
9    this list of conditions and the following disclaimer.
10 
11  * Redistributions in binary form must reproduce the above copyright notice,
12    this list of conditions and the following disclaimer in the documentation
13    and/or other materials provided with the distribution.
14 
15  * Neither the name Facebook nor the names of its contributors may be used to
16    endorse or promote products derived from this software without specific
17    prior written permission.
18 
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30 
31 #include "watchman.h"
32 
33 #if defined(HAVE_RUBY_ST_H)
34 #include <ruby/st.h>
35 #elif defined(HAVE_ST_H)
36 #include <st.h>
37 #else
38 #error no st.h header found
39 #endif
40 
41 #include <fcntl.h>      /* for fcntl() */
42 #include <sys/errno.h>  /* for errno */
43 #include <sys/socket.h> /* for recv(), MSG_PEEK */
44 
45 typedef struct {
46     uint8_t *data;  // payload
47     size_t cap;     // total capacity
48     size_t len;     // current length
49 } watchman_t;
50 
51 // Forward declarations:
52 VALUE watchman_load(char **ptr, char *end);
53 void watchman_dump(watchman_t *w, VALUE serializable);
54 
55 #define WATCHMAN_DEFAULT_STORAGE 4096
56 
57 #define WATCHMAN_BINARY_MARKER   "\x00\x01"
58 #define WATCHMAN_ARRAY_MARKER    0x00
59 #define WATCHMAN_HASH_MARKER     0x01
60 #define WATCHMAN_STRING_MARKER   0x02
61 #define WATCHMAN_INT8_MARKER     0x03
62 #define WATCHMAN_INT16_MARKER    0x04
63 #define WATCHMAN_INT32_MARKER    0x05
64 #define WATCHMAN_INT64_MARKER    0x06
65 #define WATCHMAN_FLOAT_MARKER    0x07
66 #define WATCHMAN_TRUE            0x08
67 #define WATCHMAN_FALSE           0x09
68 #define WATCHMAN_NIL             0x0a
69 #define WATCHMAN_TEMPLATE_MARKER 0x0b
70 #define WATCHMAN_SKIP_MARKER     0x0c
71 
72 #define WATCHMAN_HEADER \
73         WATCHMAN_BINARY_MARKER \
74         "\x06" \
75         "\x00\x00\x00\x00\x00\x00\x00\x00"
76 
77 static const char watchman_array_marker  = WATCHMAN_ARRAY_MARKER;
78 static const char watchman_hash_marker   = WATCHMAN_HASH_MARKER;
79 static const char watchman_string_marker = WATCHMAN_STRING_MARKER;
80 static const char watchman_true          = WATCHMAN_TRUE;
81 static const char watchman_false         = WATCHMAN_FALSE;
82 static const char watchman_nil           = WATCHMAN_NIL;
83 
84 /**
85  * Appends `len` bytes, starting at `data`, to the watchman_t struct `w`
86  *
87  * Will attempt to reallocate the underlying storage if it is not sufficient.
88  */
89 void watchman_append(watchman_t *w, const char *data, size_t len) {
90     if (w->len + len > w->cap) {
91         w->cap += w->len + WATCHMAN_DEFAULT_STORAGE;
92         REALLOC_N(w->data, uint8_t, w->cap);
93     }
94     memcpy(w->data + w->len, data, len);
95     w->len += len;
96 }
97 
98 /**
99  * Allocate a new watchman_t struct
100  *
101  * The struct has a small amount of extra capacity preallocated, and a blank
102  * header that can be filled in later to describe the PDU.
103  */
104 watchman_t *watchman_init() {
105     watchman_t *w = ALLOC(watchman_t);
106     w->cap = WATCHMAN_DEFAULT_STORAGE;
107     w->len = 0;
108     w->data = ALLOC_N(uint8_t, WATCHMAN_DEFAULT_STORAGE);
109 
110     watchman_append(w, WATCHMAN_HEADER, sizeof(WATCHMAN_HEADER) - 1);
111     return w;
112 }
113 
114 /**
115  * Free a watchman_t struct `w` that was previously allocated with
116  * `watchman_init`
117  */
118 void watchman_free(watchman_t *w) {
119     xfree(w->data);
120     xfree(w);
121 }
122 
123 /**
124  * Encodes and appends the integer `num` to `w`
125  */
126 void watchman_dump_int(watchman_t *w, int64_t num) {
127     char encoded[1 + sizeof(int64_t)];
128 
129     if (num == (int8_t)num) {
130         encoded[0] = WATCHMAN_INT8_MARKER;
131         encoded[1] = (int8_t)num;
132         watchman_append(w, encoded, 1 + sizeof(int8_t));
133     } else if (num == (int16_t)num) {
134         encoded[0] = WATCHMAN_INT16_MARKER;
135         *(int16_t *)(encoded + 1) = (int16_t)num;
136         watchman_append(w, encoded, 1 + sizeof(int16_t));
137     } else if (num == (int32_t)num) {
138         encoded[0] = WATCHMAN_INT32_MARKER;
139         *(int32_t *)(encoded + 1) = (int32_t)num;
140         watchman_append(w, encoded, 1 + sizeof(int32_t));
141     } else {
142         encoded[0] = WATCHMAN_INT64_MARKER;
143         *(int64_t *)(encoded + 1) = (int64_t)num;
144         watchman_append(w, encoded, 1 + sizeof(int64_t));
145     }
146 }
147 
148 /**
149  * Encodes and appends the string `string` to `w`
150  */
151 void watchman_dump_string(watchman_t *w, VALUE string) {
152     watchman_append(w, &watchman_string_marker, sizeof(watchman_string_marker));
153     watchman_dump_int(w, RSTRING_LEN(string));
154     watchman_append(w, RSTRING_PTR(string), RSTRING_LEN(string));
155 }
156 
157 /**
158  * Encodes and appends the double `num` to `w`
159  */
160 void watchman_dump_double(watchman_t *w, double num) {
161     char encoded[1 + sizeof(double)];
162     encoded[0] = WATCHMAN_FLOAT_MARKER;
163     *(double *)(encoded + 1) = num;
164     watchman_append(w, encoded, sizeof(encoded));
165 }
166 
167 /**
168  * Encodes and appends the array `array` to `w`
169  */
170 void watchman_dump_array(watchman_t *w, VALUE array) {
171     long i;
172     watchman_append(w, &watchman_array_marker, sizeof(watchman_array_marker));
173     watchman_dump_int(w, RARRAY_LEN(array));
174     for (i = 0; i < RARRAY_LEN(array); i++) {
175         watchman_dump(w, rb_ary_entry(array, i));
176     }
177 }
178 
179 /**
180  * Helper method that encodes and appends a key/value pair (`key`, `value`) from
181  * a hash to the watchman_t struct passed in via `data`
182  */
183 int watchman_dump_hash_iterator(VALUE key, VALUE value, VALUE data) {
184     watchman_t *w = (watchman_t *)data;
185     watchman_dump_string(w, StringValue(key));
186     watchman_dump(w, value);
187     return ST_CONTINUE;
188 }
189 
190 /**
191  * Encodes and appends the hash `hash` to `w`
192  */
193 void watchman_dump_hash(watchman_t *w, VALUE hash) {
194     watchman_append(w, &watchman_hash_marker, sizeof(watchman_hash_marker));
195     watchman_dump_int(w, RHASH_SIZE(hash));
196     rb_hash_foreach(hash, watchman_dump_hash_iterator, (VALUE)w);
197 }
198 
199 /**
200  * Encodes and appends the serialized Ruby object `serializable` to `w`
201  *
202  * Examples of serializable objects include arrays, hashes, strings, numbers
203  * (integers, floats), booleans, and nil.
204  */
205 void watchman_dump(watchman_t *w, VALUE serializable) {
206     switch (TYPE(serializable)) {
207         case T_ARRAY:
208             return watchman_dump_array(w, serializable);
209         case T_HASH:
210             return watchman_dump_hash(w, serializable);
211         case T_STRING:
212             return watchman_dump_string(w, serializable);
213         case T_FIXNUM: // up to 63 bits
214             return watchman_dump_int(w, FIX2LONG(serializable));
215         case T_BIGNUM:
216             return watchman_dump_int(w, NUM2LL(serializable));
217         case T_FLOAT:
218             return watchman_dump_double(w, NUM2DBL(serializable));
219         case T_TRUE:
220             return watchman_append(w, &watchman_true, sizeof(watchman_true));
221         case T_FALSE:
222             return watchman_append(w, &watchman_false, sizeof(watchman_false));
223         case T_NIL:
224             return watchman_append(w, &watchman_nil, sizeof(watchman_nil));
225         default:
226             rb_raise(rb_eTypeError, "unsupported type");
227     }
228 }
229 
230 /**
231  * Extract and return the int encoded at `ptr`
232  *
233  * Moves `ptr` past the extracted int.
234  *
235  * Will raise an ArgumentError if extracting the int would take us beyond the
236  * end of the buffer indicated by `end`, or if there is no int encoded at `ptr`.
237  *
238  * @returns The extracted int
239  */
240 int64_t watchman_load_int(char **ptr, char *end) {
241     char *val_ptr = *ptr + sizeof(int8_t);
242     int64_t val = 0;
243 
244     if (val_ptr >= end) {
245         rb_raise(rb_eArgError, "insufficient int storage");
246     }
247 
248     switch (*ptr[0]) {
249         case WATCHMAN_INT8_MARKER:
250             if (val_ptr + sizeof(int8_t) > end) {
251                 rb_raise(rb_eArgError, "overrun extracting int8_t");
252             }
253             val = *(int8_t *)val_ptr;
254             *ptr = val_ptr + sizeof(int8_t);
255             break;
256         case WATCHMAN_INT16_MARKER:
257             if (val_ptr + sizeof(int16_t) > end) {
258                 rb_raise(rb_eArgError, "overrun extracting int16_t");
259             }
260             val = *(int16_t *)val_ptr;
261             *ptr = val_ptr + sizeof(int16_t);
262             break;
263         case WATCHMAN_INT32_MARKER:
264             if (val_ptr + sizeof(int32_t) > end) {
265                 rb_raise(rb_eArgError, "overrun extracting int32_t");
266             }
267             val = *(int32_t *)val_ptr;
268             *ptr = val_ptr + sizeof(int32_t);
269             break;
270         case WATCHMAN_INT64_MARKER:
271             if (val_ptr + sizeof(int64_t) > end) {
272                 rb_raise(rb_eArgError, "overrun extracting int64_t");
273             }
274             val = *(int64_t *)val_ptr;
275             *ptr = val_ptr + sizeof(int64_t);
276             break;
277         default:
278             rb_raise(
279                 rb_eArgError,
280                 "bad integer marker 0x%02x",
281                 (unsigned int)*ptr[0]
282             );
283             break;
284     }
285 
286     return val;
287 }
288 
289 /**
290  * Reads and returns a string encoded in the Watchman binary protocol format,
291  * starting at `ptr` and finishing at or before `end`
292  */
293 VALUE watchman_load_string(char **ptr, char *end) {
294     if (*ptr >= end) {
295         rb_raise(rb_eArgError, "unexpected end of input");
296     }
297 
298     if (*ptr[0] != WATCHMAN_STRING_MARKER) {
299         rb_raise(rb_eArgError, "not a number");
300     }
301 
302     *ptr += sizeof(int8_t);
303     if (*ptr >= end) {
304         rb_raise(rb_eArgError, "invalid string header");
305     }
306 
307     int64_t len = watchman_load_int(ptr, end);
308     if (len == 0) { // special case for zero-length strings
309         return rb_str_new2("");
310     } else if (*ptr + len > end) {
311         rb_raise(rb_eArgError, "insufficient string storage");
312     }
313 
314     VALUE string = rb_str_new(*ptr, len);
315     *ptr += len;
316     return string;
317 }
318 
319 /**
320  * Reads and returns a double encoded in the Watchman binary protocol format,
321  * starting at `ptr` and finishing at or before `end`
322  */
323 double watchman_load_double(char **ptr, char *end) {
324     *ptr += sizeof(int8_t); // caller has already verified the marker
325     if (*ptr + sizeof(double) > end) {
326         rb_raise(rb_eArgError, "insufficient double storage");
327     }
328     double val = *(double *)*ptr;
329     *ptr += sizeof(double);
330     return val;
331 }
332 
333 /**
334  * Helper method which returns length of the array encoded in the Watchman
335  * binary protocol format, starting at `ptr` and finishing at or before `end`
336  */
337 int64_t watchman_load_array_header(char **ptr, char *end) {
338     if (*ptr >= end) {
339         rb_raise(rb_eArgError, "unexpected end of input");
340     }
341 
342     // verify and consume marker
343     if (*ptr[0] != WATCHMAN_ARRAY_MARKER) {
344         rb_raise(rb_eArgError, "not an array");
345     }
346     *ptr += sizeof(int8_t);
347 
348     // expect a count
349     if (*ptr + sizeof(int8_t) * 2 > end) {
350         rb_raise(rb_eArgError, "incomplete array header");
351     }
352     return watchman_load_int(ptr, end);
353 }
354 
355 /**
356  * Reads and returns an array encoded in the Watchman binary protocol format,
357  * starting at `ptr` and finishing at or before `end`
358  */
359 VALUE watchman_load_array(char **ptr, char *end) {
360     int64_t count, i;
361     VALUE array;
362 
363     count = watchman_load_array_header(ptr, end);
364     array = rb_ary_new2(count);
365 
366     for (i = 0; i < count; i++) {
367         rb_ary_push(array, watchman_load(ptr, end));
368     }
369 
370     return array;
371 }
372 
373 /**
374  * Reads and returns a hash encoded in the Watchman binary protocol format,
375  * starting at `ptr` and finishing at or before `end`
376  */
377 VALUE watchman_load_hash(char **ptr, char *end) {
378     int64_t count, i;
379     VALUE hash, key, value;
380 
381     *ptr += sizeof(int8_t); // caller has already verified the marker
382 
383     // expect a count
384     if (*ptr + sizeof(int8_t) * 2 > end) {
385         rb_raise(rb_eArgError, "incomplete hash header");
386     }
387     count = watchman_load_int(ptr, end);
388 
389     hash = rb_hash_new();
390 
391     for (i = 0; i < count; i++) {
392         key = watchman_load_string(ptr, end);
393         value = watchman_load(ptr, end);
394         rb_hash_aset(hash, key, value);
395     }
396 
397     return hash;
398 }
399 
400 /**
401  * Reads and returns a templated array encoded in the Watchman binary protocol
402  * format, starting at `ptr` and finishing at or before `end`
403  *
404  * Templated arrays are arrays of hashes which have repetitive key information
405  * pulled out into a separate "headers" prefix.
406  *
407  * @see https://github.com/facebook/watchman/blob/master/BSER.markdown
408  */
409 VALUE watchman_load_template(char **ptr, char *end) {
410     int64_t header_items_count, i, row_count;
411     VALUE array, hash, header, key, value;
412 
413     *ptr += sizeof(int8_t); // caller has already verified the marker
414 
415     // process template header array
416     header_items_count = watchman_load_array_header(ptr, end);
417     header = rb_ary_new2(header_items_count);
418     for (i = 0; i < header_items_count; i++) {
419         rb_ary_push(header, watchman_load_string(ptr, end));
420     }
421 
422     // process row items
423     row_count = watchman_load_int(ptr, end);
424     array = rb_ary_new2(header_items_count);
425     while (row_count--) {
426         hash = rb_hash_new();
427         for (i = 0; i < header_items_count; i++) {
428             if (*ptr >= end) {
429                 rb_raise(rb_eArgError, "unexpected end of input");
430             }
431 
432             if (*ptr[0] == WATCHMAN_SKIP_MARKER) {
433                 *ptr += sizeof(uint8_t);
434             } else {
435                 value = watchman_load(ptr, end);
436                 key = rb_ary_entry(header, i);
437                 rb_hash_aset(hash, key, value);
438             }
439         }
440         rb_ary_push(array, hash);
441     }
442     return array;
443 }
444 
445 /**
446  * Reads and returns an object encoded in the Watchman binary protocol format,
447  * starting at `ptr` and finishing at or before `end`
448  */
449 VALUE watchman_load(char **ptr, char *end) {
450     if (*ptr >= end) {
451         rb_raise(rb_eArgError, "unexpected end of input");
452     }
453 
454     switch (*ptr[0]) {
455         case WATCHMAN_ARRAY_MARKER:
456             return watchman_load_array(ptr, end);
457         case WATCHMAN_HASH_MARKER:
458             return watchman_load_hash(ptr, end);
459         case WATCHMAN_STRING_MARKER:
460             return watchman_load_string(ptr, end);
461         case WATCHMAN_INT8_MARKER:
462         case WATCHMAN_INT16_MARKER:
463         case WATCHMAN_INT32_MARKER:
464         case WATCHMAN_INT64_MARKER:
465             return LL2NUM(watchman_load_int(ptr, end));
466         case WATCHMAN_FLOAT_MARKER:
467             return rb_float_new(watchman_load_double(ptr, end));
468         case WATCHMAN_TRUE:
469             *ptr += 1;
470             return Qtrue;
471         case WATCHMAN_FALSE:
472             *ptr += 1;
473             return Qfalse;
474         case WATCHMAN_NIL:
475             *ptr += 1;
476             return Qnil;
477         case WATCHMAN_TEMPLATE_MARKER:
478             return watchman_load_template(ptr, end);
479         default:
480             rb_raise(rb_eTypeError, "unsupported type");
481     }
482 
483     return Qnil; // keep the compiler happy
484 }
485 
486 /**
487  * RubyWatchman.load(serialized)
488  *
489  * Converts the binary object, `serialized`, from the Watchman binary protocol
490  * format into a normal Ruby object.
491  */
492 VALUE RubyWatchman_load(VALUE self, VALUE serialized) {
493     serialized = StringValue(serialized);
494     long len = RSTRING_LEN(serialized);
495     char *ptr = RSTRING_PTR(serialized);
496     char *end = ptr + len;
497 
498     // expect at least the binary marker and a int8_t length counter
499     if ((size_t)len < sizeof(WATCHMAN_BINARY_MARKER) - 1 + sizeof(int8_t) * 2) {
500         rb_raise(rb_eArgError, "undersized header");
501     }
502 
503     int mismatched =
504         memcmp(ptr, WATCHMAN_BINARY_MARKER, sizeof(WATCHMAN_BINARY_MARKER) - 1);
505     if (mismatched) {
506         rb_raise(rb_eArgError, "missing binary marker");
507     }
508 
509     // get size marker
510     ptr += sizeof(WATCHMAN_BINARY_MARKER) - 1;
511     uint64_t payload_size = watchman_load_int(&ptr, end);
512     if (!payload_size) {
513         rb_raise(rb_eArgError, "empty payload");
514     }
515 
516     // sanity check length
517     if (ptr + payload_size != end) {
518         rb_raise(
519             rb_eArgError,
520             "payload size mismatch (%lu)",
521             (unsigned long)(end - (ptr + payload_size))
522         );
523     }
524 
525     VALUE loaded = watchman_load(&ptr, end);
526 
527     // one more sanity check
528     if (ptr != end) {
529         rb_raise(
530             rb_eArgError,
531             "payload termination mismatch (%lu)",
532             (unsigned long)(end - ptr)
533         );
534     }
535 
536     return loaded;
537 }
538 
539 /**
540  * RubyWatchman.dump(serializable)
541  *
542  * Converts the Ruby object, `serializable`, into a binary string in the
543  * Watchman binary protocol format.
544  *
545  * Examples of serializable objects include arrays, hashes, strings, numbers
546  * (integers, floats), booleans, and nil.
547  */
548 VALUE RubyWatchman_dump(VALUE self, VALUE serializable) {
549     watchman_t *w = watchman_init();
550     watchman_dump(w, serializable);
551 
552     // update header with final length information
553     uint64_t *len =
554         (uint64_t *)(w->data + sizeof(WATCHMAN_HEADER) - sizeof(uint64_t) - 1);
555     *len = w->len - sizeof(WATCHMAN_HEADER) + 1;
556 
557     // prepare final return value
558     VALUE serialized = rb_str_buf_new(w->len);
559     rb_str_buf_cat(serialized, (const char*)w->data, w->len);
560     watchman_free(w);
561     return serialized;
562 }
563 
564 // How far we have to look to figure out the size of the PDU header
565 #define WATCHMAN_SNIFF_BUFFER_SIZE \
566     sizeof(WATCHMAN_BINARY_MARKER) - 1 + sizeof(int8_t)
567 
568 // How far we have to peek, at most, to figure out the size of the PDU itself
569 #define WATCHMAN_PEEK_BUFFER_SIZE \
570     sizeof(WATCHMAN_BINARY_MARKER) - 1 + \
571     sizeof(WATCHMAN_INT64_MARKER) + \
572     sizeof(int64_t)
573 
574 /**
575  * RubyWatchman.query(query, socket)
576  *
577  * Converts `query`, a Watchman query comprising Ruby objects, into the Watchman
578  * binary protocol format, transmits it over socket, and unserializes and
579  * returns the result.
580  */
581 VALUE RubyWatchman_query(VALUE self, VALUE query, VALUE socket) {
582     VALUE error = Qnil;
583     VALUE errorClass = Qnil;
584     VALUE loaded = Qnil;
585     char *buffer = NULL;
586     int fileno = NUM2INT(rb_funcall(socket, rb_intern("fileno"), 0));
587 
588     // do blocking I/O to simplify the following logic
589     int flags = fcntl(fileno, F_GETFL);
590     if (
591         !(flags & O_NONBLOCK) &&
592         fcntl(fileno, F_SETFL, flags & ~O_NONBLOCK) == -1
593     ) {
594         error = rb_str_new2("unable to clear O_NONBLOCK flag");
595         goto cleanup;
596     }
597 
598     // send the message
599     VALUE serialized = RubyWatchman_dump(self, query);
600     long query_len = RSTRING_LEN(serialized);
601     ssize_t sent = send(fileno, RSTRING_PTR(serialized), query_len, 0);
602     if (sent == -1) {
603         goto system_call_fail;
604     } else if (sent != query_len) {
605         error = rb_str_new2("sent byte count mismatch");
606         goto cleanup;
607     }
608 
609     // sniff to see how large the header is
610     int8_t peek[WATCHMAN_PEEK_BUFFER_SIZE];
611     ssize_t received =
612         recv(fileno, peek, WATCHMAN_SNIFF_BUFFER_SIZE, MSG_PEEK | MSG_WAITALL);
613     if (received == -1) {
614         goto system_call_fail;
615     } else if (received != WATCHMAN_SNIFF_BUFFER_SIZE) {
616         error = rb_str_new2("failed to sniff PDU header");
617         goto cleanup;
618     }
619 
620     // peek at size of PDU
621     int8_t sizes[] = { 0, 0, 0, 1, 2, 4, 8 };
622     int8_t sizes_idx = peek[sizeof(WATCHMAN_BINARY_MARKER) - 1];
623     if (sizes_idx < WATCHMAN_INT8_MARKER || sizes_idx > WATCHMAN_INT64_MARKER) {
624         error = rb_str_new2("bad PDU size marker");
625         goto cleanup;
626     }
627     ssize_t peek_size = sizeof(WATCHMAN_BINARY_MARKER) - 1 + sizeof(int8_t) +
628         sizes[sizes_idx];
629 
630     received = recv(fileno, peek, peek_size, MSG_PEEK);
631     if (received == -1) {
632         goto system_call_fail;
633     } else if (received != peek_size) {
634         error = rb_str_new2("failed to peek at PDU header");
635         goto cleanup;
636     }
637     int8_t *pdu_size_ptr =
638         peek + sizeof(WATCHMAN_BINARY_MARKER) - sizeof(int8_t);
639     int64_t payload_size =
640         peek_size +
641         watchman_load_int((char **)&pdu_size_ptr, (char *)peek + peek_size);
642 
643     // actually read the PDU
644     buffer = xmalloc(payload_size);
645     if (!buffer) {
646         errorClass = rb_eNoMemError;
647         error = rb_str_new2("failed to allocate");
648         goto cleanup;
649     }
650     received = recv(fileno, buffer, payload_size, MSG_WAITALL);
651     if (received == -1) {
652         goto system_call_fail;
653     } else if (received != payload_size) {
654         error = rb_str_new2("failed to load PDU");
655         goto cleanup;
656     }
657 
658     if (!(flags & O_NONBLOCK) && fcntl(fileno, F_SETFL, flags) == -1) {
659         error = rb_str_new2("unable to restore fnctl flags");
660         goto cleanup;
661     }
662 
663     char *payload = buffer + peek_size;
664     loaded = watchman_load(&payload, payload + payload_size);
665     goto cleanup;
666 
667 system_call_fail:
668     errorClass = rb_eSystemCallError;
669     error = INT2FIX(errno);
670 
671 cleanup:
672     if (buffer) {
673         xfree(buffer);
674     }
675 
676     if (!(flags & O_NONBLOCK) && fcntl(fileno, F_SETFL, flags) == -1) {
677         rb_raise(rb_eRuntimeError, "unable to restore fnctl flags");
678     }
679 
680     if (NIL_P(errorClass)) {
681         errorClass = rb_eRuntimeError;
682     }
683 
684     if (!NIL_P(error)) {
685         rb_exc_raise(rb_class_new_instance(1, &error, errorClass));
686     }
687 
688     return loaded;
689 }
690 
691 VALUE mRubyWatchman = 0; // module RubyWatchman
692 
693 void Init_ext() {
694     mRubyWatchman = rb_define_module("RubyWatchman");
695     rb_define_singleton_method(mRubyWatchman, "load", RubyWatchman_load, 1);
696     rb_define_singleton_method(mRubyWatchman, "dump", RubyWatchman_dump, 1);
697     rb_define_singleton_method(mRubyWatchman, "query", RubyWatchman_query, 2);
698 }
699