1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <errno.h>
21 #include <netdb.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <string.h>
25 #include <unistd.h>
26 
27 #include <thrift/c_glib/thrift.h>
28 #include <thrift/c_glib/transport/thrift_transport.h>
29 #include <thrift/c_glib/transport/thrift_buffered_transport.h>
30 
31 /* object properties */
32 enum _ThriftBufferedTransportProperties
33 {
34   PROP_0,
35   PROP_THRIFT_BUFFERED_TRANSPORT_TRANSPORT,
36   PROP_THRIFT_BUFFERED_TRANSPORT_READ_BUFFER_SIZE,
37   PROP_THRIFT_BUFFERED_TRANSPORT_WRITE_BUFFER_SIZE,
38   PROP_THRIFT_BUFFERED_TRANSPORT_CONFIGURATION,
39   PROP_THRIFT_BUFFERED_TRANSPORT_REMAINING_MESSAGE_SIZE,
40   PROP_THRIFT_BUFFERED_TRANSPORT_KNOW_MESSAGE_SIZE
41 };
42 
G_DEFINE_TYPE(ThriftBufferedTransport,thrift_buffered_transport,THRIFT_TYPE_TRANSPORT)43 G_DEFINE_TYPE(ThriftBufferedTransport, thrift_buffered_transport, THRIFT_TYPE_TRANSPORT)
44 
45 /* implements thrift_transport_is_open */
46 gboolean
47 thrift_buffered_transport_is_open (ThriftTransport *transport)
48 {
49   ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
50   return THRIFT_TRANSPORT_GET_CLASS (t->transport)->is_open (t->transport);
51 }
52 
53 /* overrides thrift_transport_peek */
54 gboolean
thrift_buffered_transport_peek(ThriftTransport * transport,GError ** error)55 thrift_buffered_transport_peek (ThriftTransport *transport, GError **error)
56 {
57   ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
58   return (t->r_buf->len > 0) || thrift_transport_peek (t->transport, error);
59 }
60 
61 /* implements thrift_transport_open */
62 gboolean
thrift_buffered_transport_open(ThriftTransport * transport,GError ** error)63 thrift_buffered_transport_open (ThriftTransport *transport, GError **error)
64 {
65   ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
66   return THRIFT_TRANSPORT_GET_CLASS (t->transport)->open (t->transport, error);
67 }
68 
69 /* implements thrift_transport_close */
70 gboolean
thrift_buffered_transport_close(ThriftTransport * transport,GError ** error)71 thrift_buffered_transport_close (ThriftTransport *transport, GError **error)
72 {
73   ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
74   return THRIFT_TRANSPORT_GET_CLASS (t->transport)->close (t->transport, error);
75 }
76 
77 /* the actual read is "slow" because it calls the underlying transport */
78 gint32
thrift_buffered_transport_read_slow(ThriftTransport * transport,gpointer buf,guint32 len,GError ** error)79 thrift_buffered_transport_read_slow (ThriftTransport *transport, gpointer buf,
80                                      guint32 len, GError **error)
81 {
82   ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
83   gint ret = 0;
84   guint32 want = len;
85   guint32 got = 0;
86   guchar *tmpdata = g_new0 (guchar, len);
87   guint32 have = t->r_buf->len;
88 
89 
90   /* we shouldn't hit this unless the buffer doesn't have enough to read */
91   g_assert (t->r_buf->len < want);
92 
93   /* first copy what we have in our buffer. */
94   if (have > 0)
95   {
96     memcpy (buf, t->r_buf, t->r_buf->len);
97     want -= t->r_buf->len;
98     t->r_buf = g_byte_array_remove_range (t->r_buf, 0, t->r_buf->len);
99   }
100 
101   /* if the buffer is still smaller than what we want to read, then just
102    * read it directly.  otherwise, fill the buffer and then give out
103    * enough to satisfy the read. */
104   if (t->r_buf_size < want)
105   {
106     if ((ret = THRIFT_TRANSPORT_GET_CLASS (t->transport)->read (t->transport,
107                                                                 tmpdata,
108                                                                 want,
109                                                                 error)) < 0) {
110       g_free (tmpdata);
111       return ret;
112     }
113     got += ret;
114 
115     /* copy the data starting from where we left off */
116     memcpy ((guint8 *)buf + have, tmpdata, got);
117     g_free (tmpdata);
118     return got + have;
119   } else {
120     guint32 give;
121 
122     if ((ret = THRIFT_TRANSPORT_GET_CLASS (t->transport)->read (t->transport,
123                                                                 tmpdata,
124                                                                 want,
125                                                                 error)) < 0) {
126       g_free (tmpdata);
127       return ret;
128     }
129     got += ret;
130     t->r_buf = g_byte_array_append (t->r_buf, tmpdata, got);
131     g_free (tmpdata);
132     /* hand over what we have up to what the caller wants */
133     give = want < t->r_buf->len ? want : t->r_buf->len;
134 
135 
136     memcpy ((guint8 *)buf + len - want, t->r_buf->data, give);
137     t->r_buf = g_byte_array_remove_range (t->r_buf, 0, give);
138     want -= give;
139 
140     return (len - want);
141   }
142 }
143 
144 /* implements thrift_transport_read */
145 gint32
thrift_buffered_transport_read(ThriftTransport * transport,gpointer buf,guint32 len,GError ** error)146 thrift_buffered_transport_read (ThriftTransport *transport, gpointer buf,
147                                 guint32 len, GError **error)
148 {
149   ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
150   ThriftTransportClass *ttc = THRIFT_TRANSPORT_GET_CLASS (transport);
151   if(!ttc->checkReadBytesAvailable (transport, len, error))
152   {
153     return -1;
154   }
155 
156   /* if we have enough buffer data to fulfill the read, just use
157    * a memcpy */
158   if (len <= t->r_buf->len)
159   {
160     memcpy (buf, t->r_buf->data, len);
161     g_byte_array_remove_range (t->r_buf, 0, len);
162     return len;
163   }
164 
165   return thrift_buffered_transport_read_slow (transport, buf, len, error);
166 }
167 
168 /* implements thrift_transport_read_end
169  * called when write is complete.  nothing to do on our end. */
170 gboolean
thrift_buffered_transport_read_end(ThriftTransport * transport,GError ** error)171 thrift_buffered_transport_read_end (ThriftTransport *transport, GError **error)
172 {
173   /* satisfy -Wall */
174   THRIFT_UNUSED_VAR (transport);
175   THRIFT_UNUSED_VAR (error);
176   return TRUE;
177 }
178 
179 gboolean
thrift_buffered_transport_write_slow(ThriftTransport * transport,gpointer buf,guint32 len,GError ** error)180 thrift_buffered_transport_write_slow (ThriftTransport *transport, gpointer buf,
181                                       guint32 len, GError **error)
182 {
183   ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
184   guint32 have_bytes = t->w_buf->len;
185   guint32 space = t->w_buf_size - t->w_buf->len;
186 
187   /* we need two syscalls because the buffered data plus the buffer itself
188    * is too big. */
189   if ((have_bytes + len >= 2*t->w_buf_size) || (have_bytes == 0))
190   {
191     if (have_bytes > 0)
192     {
193       if (!THRIFT_TRANSPORT_GET_CLASS (t->transport)->write (t->transport,
194                                                              t->w_buf->data,
195                                                              have_bytes,
196                                                              error)) {
197         return FALSE;
198       }
199       t->w_buf = g_byte_array_remove_range (t->w_buf, 0, have_bytes);
200     }
201     if (!THRIFT_TRANSPORT_GET_CLASS (t->transport)->write (t->transport,
202                                                            buf, len, error)) {
203       return FALSE;
204     }
205     return TRUE;
206   }
207 
208   t->w_buf = g_byte_array_append (t->w_buf, buf, space);
209   if (!THRIFT_TRANSPORT_GET_CLASS (t->transport)->write (t->transport,
210                                                          t->w_buf->data,
211                                                          t->w_buf->len,
212                                                          error)) {
213     return FALSE;
214   }
215 
216   t->w_buf = g_byte_array_remove_range (t->w_buf, 0, t->w_buf->len);
217   t->w_buf = g_byte_array_append (t->w_buf, (guint8 *)buf + space, len-space);
218 
219   return TRUE;
220 }
221 
222 /* implements thrift_transport_write */
223 gboolean
thrift_buffered_transport_write(ThriftTransport * transport,const gpointer buf,const guint32 len,GError ** error)224 thrift_buffered_transport_write (ThriftTransport *transport,
225                                  const gpointer buf,
226                                  const guint32 len, GError **error)
227 {
228   ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
229 
230   /* the length of the current buffer plus the length of the data being read */
231   if (t->w_buf->len + len <= t->w_buf_size)
232   {
233     t->w_buf = g_byte_array_append (t->w_buf, buf, len);
234     return len;
235   }
236 
237   return thrift_buffered_transport_write_slow (transport, buf, len, error);
238 }
239 
240 /* implements thrift_transport_write_end
241  * called when write is complete.  nothing to do on our end. */
242 gboolean
thrift_buffered_transport_write_end(ThriftTransport * transport,GError ** error)243 thrift_buffered_transport_write_end (ThriftTransport *transport, GError **error)
244 {
245   /* satisfy -Wall */
246   THRIFT_UNUSED_VAR (transport);
247   THRIFT_UNUSED_VAR (error);
248   return TRUE;
249 }
250 
251 /* implements thrift_transport_flush */
252 gboolean
thrift_buffered_transport_flush(ThriftTransport * transport,GError ** error)253 thrift_buffered_transport_flush (ThriftTransport *transport, GError **error)
254 {
255   ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
256   ThriftTransportClass *ttc = THRIFT_TRANSPORT_GET_CLASS (transport);
257 
258   if(!ttc->resetConsumedMessageSize (transport, -1, error))
259   {
260     return FALSE;
261   }
262 
263   if (t->w_buf != NULL && t->w_buf->len > 0)
264   {
265     /* write the buffer and then empty it */
266     if (!THRIFT_TRANSPORT_GET_CLASS (t->transport)->write (t->transport,
267                                                            t->w_buf->data,
268                                                            t->w_buf->len,
269                                                            error)) {
270       return FALSE;
271     }
272     t->w_buf = g_byte_array_remove_range (t->w_buf, 0, t->w_buf->len);
273   }
274   THRIFT_TRANSPORT_GET_CLASS (t->transport)->flush (t->transport,
275                                                     error);
276 
277   return TRUE;
278 }
279 
280 /* initializes the instance */
281 static void
thrift_buffered_transport_init(ThriftBufferedTransport * transport)282 thrift_buffered_transport_init (ThriftBufferedTransport *transport)
283 {
284   transport->transport = NULL;
285   transport->r_buf = g_byte_array_new ();
286   transport->w_buf = g_byte_array_new ();
287 }
288 
289 /* destructor */
290 static void
thrift_buffered_transport_finalize(GObject * object)291 thrift_buffered_transport_finalize (GObject *object)
292 {
293   ThriftBufferedTransport *transport = THRIFT_BUFFERED_TRANSPORT (object);
294 
295   if (transport->r_buf != NULL)
296   {
297     g_byte_array_free (transport->r_buf, TRUE);
298   }
299   transport->r_buf = NULL;
300 
301   if (transport->w_buf != NULL)
302   {
303     g_byte_array_free (transport->w_buf, TRUE);
304   }
305   transport->w_buf = NULL;
306 }
307 
308 /* property accessor */
309 void
thrift_buffered_transport_get_property(GObject * object,guint property_id,GValue * value,GParamSpec * pspec)310 thrift_buffered_transport_get_property (GObject *object, guint property_id,
311                                         GValue *value, GParamSpec *pspec)
312 {
313   ThriftBufferedTransport *transport = THRIFT_BUFFERED_TRANSPORT (object);
314 
315   ThriftTransport *tt = THRIFT_TRANSPORT (object);
316 
317   THRIFT_UNUSED_VAR (pspec);
318 
319   switch (property_id)
320   {
321     case PROP_THRIFT_BUFFERED_TRANSPORT_TRANSPORT:
322       g_value_set_object (value, transport->transport);
323       break;
324     case PROP_THRIFT_BUFFERED_TRANSPORT_READ_BUFFER_SIZE:
325       g_value_set_uint (value, transport->r_buf_size);
326       break;
327     case PROP_THRIFT_BUFFERED_TRANSPORT_WRITE_BUFFER_SIZE:
328       g_value_set_uint (value, transport->w_buf_size);
329       break;
330     case PROP_THRIFT_BUFFERED_TRANSPORT_CONFIGURATION:
331       g_value_set_object (value, tt->configuration);
332       break;
333     case PROP_THRIFT_BUFFERED_TRANSPORT_REMAINING_MESSAGE_SIZE:
334       g_value_set_long (value, tt->remainingMessageSize_);
335       break;
336     case PROP_THRIFT_BUFFERED_TRANSPORT_KNOW_MESSAGE_SIZE:
337       g_value_set_long (value, tt->knowMessageSize_);
338       break;
339   }
340 }
341 
342 /* property mutator */
343 void
thrift_buffered_transport_set_property(GObject * object,guint property_id,const GValue * value,GParamSpec * pspec)344 thrift_buffered_transport_set_property (GObject *object, guint property_id,
345                                         const GValue *value, GParamSpec *pspec)
346 {
347   ThriftBufferedTransport *transport = THRIFT_BUFFERED_TRANSPORT (object);
348 
349   ThriftTransport *tt = THRIFT_TRANSPORT (object);
350 
351   THRIFT_UNUSED_VAR (pspec);
352 
353   switch (property_id)
354   {
355     case PROP_THRIFT_BUFFERED_TRANSPORT_TRANSPORT:
356       transport->transport = g_value_get_object (value);
357       break;
358     case PROP_THRIFT_BUFFERED_TRANSPORT_READ_BUFFER_SIZE:
359       transport->r_buf_size = g_value_get_uint (value);
360       break;
361     case PROP_THRIFT_BUFFERED_TRANSPORT_WRITE_BUFFER_SIZE:
362       transport->w_buf_size = g_value_get_uint (value);
363       break;
364     case PROP_THRIFT_BUFFERED_TRANSPORT_CONFIGURATION:
365       tt->configuration = g_value_dup_object (value);
366       break;
367     case PROP_THRIFT_BUFFERED_TRANSPORT_REMAINING_MESSAGE_SIZE:
368       tt->remainingMessageSize_ = g_value_get_long (value);
369       break;
370     case PROP_THRIFT_BUFFERED_TRANSPORT_KNOW_MESSAGE_SIZE:
371       tt->knowMessageSize_ = g_value_get_long (value);
372       break;
373   }
374 }
375 
376 /* initializes the class */
377 static void
thrift_buffered_transport_class_init(ThriftBufferedTransportClass * cls)378 thrift_buffered_transport_class_init (ThriftBufferedTransportClass *cls)
379 {
380   ThriftTransportClass *ttc = THRIFT_TRANSPORT_CLASS (cls);
381   GObjectClass *gobject_class = G_OBJECT_CLASS (cls);
382   GParamSpec *param_spec = NULL;
383 
384   /* setup accessors and mutators */
385   gobject_class->get_property = thrift_buffered_transport_get_property;
386   gobject_class->set_property = thrift_buffered_transport_set_property;
387 
388   param_spec = g_param_spec_object ("transport", "transport (construct)",
389                                     "Thrift transport",
390                                     THRIFT_TYPE_TRANSPORT,
391                                     G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY);
392   g_object_class_install_property (gobject_class,
393                                    PROP_THRIFT_BUFFERED_TRANSPORT_TRANSPORT,
394                                    param_spec);
395 
396   param_spec = g_param_spec_uint ("r_buf_size",
397                                   "read buffer size (construct)",
398                                   "Set the read buffer size",
399                                   0, /* min */
400                                   1048576, /* max, 1024*1024 */
401                                   512, /* default value */
402                                   G_PARAM_CONSTRUCT_ONLY |
403                                   G_PARAM_READWRITE);
404   g_object_class_install_property (gobject_class,
405                                    PROP_THRIFT_BUFFERED_TRANSPORT_READ_BUFFER_SIZE,
406                                    param_spec);
407 
408   param_spec = g_param_spec_uint ("w_buf_size",
409                                   "write buffer size (construct)",
410                                   "Set the write buffer size",
411                                   0, /* min */
412                                   1048576, /* max, 1024*1024 */
413                                   512, /* default value */
414                                   G_PARAM_CONSTRUCT_ONLY |
415                                   G_PARAM_READWRITE);
416   g_object_class_install_property (gobject_class,
417                                    PROP_THRIFT_BUFFERED_TRANSPORT_WRITE_BUFFER_SIZE,
418                                    param_spec);
419 
420   param_spec = g_param_spec_object ("configuration",
421                                     "configuration (construct)",
422                                     "Thrift Configuration",
423                                     THRIFT_TYPE_CONFIGURATION,
424                                     G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY);
425   g_object_class_install_property (gobject_class,
426                                    PROP_THRIFT_BUFFERED_TRANSPORT_CONFIGURATION,
427                                    param_spec);
428 
429   param_spec = g_param_spec_long ("remainingmessagesize",
430                                   "remainingmessagesize (construct)",
431                                   "Set the remaining message size",
432                                   0, /* min */
433                                   G_MAXINT32, /* max */
434                                   DEFAULT_MAX_MESSAGE_SIZE, /* default by construct */
435                                   G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY);
436   g_object_class_install_property (gobject_class,
437                                    PROP_THRIFT_BUFFERED_TRANSPORT_REMAINING_MESSAGE_SIZE,
438                                    param_spec);
439 
440   param_spec = g_param_spec_long ("knowmessagesize",
441                                   "knowmessagesize (construct)",
442                                   "Set the known size of the message",
443                                   0, /* min */
444                                   G_MAXINT32, /* max */
445                                   DEFAULT_MAX_MESSAGE_SIZE, /* default by construct */
446                                   G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY);
447   g_object_class_install_property (gobject_class,
448                                    PROP_THRIFT_BUFFERED_TRANSPORT_KNOW_MESSAGE_SIZE,
449                                    param_spec);
450 
451   gobject_class->finalize = thrift_buffered_transport_finalize;
452   ttc->is_open = thrift_buffered_transport_is_open;
453   ttc->peek = thrift_buffered_transport_peek;
454   ttc->open = thrift_buffered_transport_open;
455   ttc->close = thrift_buffered_transport_close;
456   ttc->read = thrift_buffered_transport_read;
457   ttc->read_end = thrift_buffered_transport_read_end;
458   ttc->write = thrift_buffered_transport_write;
459   ttc->write_end = thrift_buffered_transport_write_end;
460   ttc->flush = thrift_buffered_transport_flush;
461 }
462