1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <string.h>
4 #include <stdint.h>
5 #include <errno.h>
6 
7 #include <unistd.h>
8 #include <sys/uio.h>
9 #include <sys/time.h>
10 
11 #include "amqp.h"
12 #include "amqp_framing.h"
13 #include "amqp_private.h"
14 
15 #include <assert.h>
16 
17 #define INITIAL_FRAME_POOL_PAGE_SIZE 65536
18 #define INITIAL_DECODING_POOL_PAGE_SIZE 131072
19 #define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
20 
21 #define ENFORCE_STATE(statevec, statenum)				\
22   {									\
23     amqp_connection_state_t _check_state = (statevec);			\
24     int _wanted_state = (statenum);					\
25     amqp_assert(_check_state->state == _wanted_state,			\
26 		"Programming error: invalid AMQP connection state: expected %d, got %d", \
27 		_wanted_state,						\
28 		_check_state->state);					\
29   }
30 
amqp_new_connection(void)31 amqp_connection_state_t amqp_new_connection(void) {
32   amqp_connection_state_t state =
33     (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
34 
35   if (state == NULL) {
36     return NULL;
37   }
38 
39   init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
40   init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
41 
42   state->state = CONNECTION_STATE_IDLE;
43 
44   state->inbound_buffer.bytes = NULL;
45   state->outbound_buffer.bytes = NULL;
46   if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) {
47     empty_amqp_pool(&state->frame_pool);
48     empty_amqp_pool(&state->decoding_pool);
49     free(state);
50     return NULL;
51   }
52 
53   state->inbound_offset = 0;
54   state->target_size = HEADER_SIZE;
55 
56   state->sockfd = -1;
57   state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
58   state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
59   if (state->sock_inbound_buffer.bytes == NULL) {
60     amqp_destroy_connection(state);
61     return NULL;
62   }
63 
64   state->sock_inbound_offset = 0;
65   state->sock_inbound_limit = 0;
66 
67   state->first_queued_frame = NULL;
68   state->last_queued_frame = NULL;
69 
70   return state;
71 }
72 
amqp_get_sockfd(amqp_connection_state_t state)73 int amqp_get_sockfd(amqp_connection_state_t state) {
74   return state->sockfd;
75 }
76 
amqp_set_sockfd(amqp_connection_state_t state,int sockfd)77 void amqp_set_sockfd(amqp_connection_state_t state,
78 		     int sockfd)
79 {
80   state->sockfd = sockfd;
81 }
82 
amqp_tune_connection(amqp_connection_state_t state,int channel_max,int frame_max,int heartbeat)83 int amqp_tune_connection(amqp_connection_state_t state,
84 			 int channel_max,
85 			 int frame_max,
86 			 int heartbeat)
87 {
88   void *newbuf;
89 
90   ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
91 
92   state->channel_max = channel_max;
93   state->frame_max = frame_max;
94   state->heartbeat = heartbeat;
95 
96   empty_amqp_pool(&state->frame_pool);
97   init_amqp_pool(&state->frame_pool, frame_max);
98 
99   state->inbound_buffer.len = frame_max;
100   state->outbound_buffer.len = frame_max;
101   newbuf = realloc(state->outbound_buffer.bytes, frame_max);
102   if (newbuf == NULL) {
103     amqp_destroy_connection(state);
104     return -ENOMEM;
105   }
106   state->outbound_buffer.bytes = newbuf;
107 
108   return 0;
109 }
110 
amqp_get_channel_max(amqp_connection_state_t state)111 int amqp_get_channel_max(amqp_connection_state_t state) {
112   return state->channel_max;
113 }
114 
amqp_destroy_connection(amqp_connection_state_t state)115 void amqp_destroy_connection(amqp_connection_state_t state) {
116   empty_amqp_pool(&state->frame_pool);
117   empty_amqp_pool(&state->decoding_pool);
118   free(state->outbound_buffer.bytes);
119   free(state->sock_inbound_buffer.bytes);
120   free(state);
121 }
122 
return_to_idle(amqp_connection_state_t state)123 static void return_to_idle(amqp_connection_state_t state) {
124   state->inbound_buffer.bytes = NULL;
125   state->inbound_offset = 0;
126   state->target_size = HEADER_SIZE;
127   state->state = CONNECTION_STATE_IDLE;
128 }
129 
amqp_set_basic_return_cb(amqp_connection_state_t state,amqp_basic_return_fn_t f,void * data)130 void amqp_set_basic_return_cb(amqp_connection_state_t state,
131                               amqp_basic_return_fn_t f, void *data) {
132   state->basic_return_callback = f;
133   state->basic_return_callback_data = data;
134 }
amqp_handle_input(amqp_connection_state_t state,amqp_bytes_t received_data,amqp_frame_t * decoded_frame)135 int amqp_handle_input(amqp_connection_state_t state,
136 		      amqp_bytes_t received_data,
137 		      amqp_frame_t *decoded_frame)
138 {
139   int total_bytes_consumed = 0;
140   int bytes_consumed;
141 
142   /* Returning frame_type of zero indicates either insufficient input,
143      or a complete, ignored frame was read. */
144   decoded_frame->frame_type = 0;
145 
146  read_more:
147   if (received_data.len == 0) {
148     return total_bytes_consumed;
149   }
150 
151   if (state->state == CONNECTION_STATE_IDLE) {
152     state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
153     state->state = CONNECTION_STATE_WAITING_FOR_HEADER;
154   }
155 
156   bytes_consumed = state->target_size - state->inbound_offset;
157   if (received_data.len < bytes_consumed) {
158     bytes_consumed = received_data.len;
159   }
160 
161   E_BYTES(state->inbound_buffer, state->inbound_offset, bytes_consumed, received_data.bytes);
162   state->inbound_offset += bytes_consumed;
163   total_bytes_consumed += bytes_consumed;
164 
165   assert(state->inbound_offset <= state->target_size);
166 
167   if (state->inbound_offset < state->target_size) {
168     return total_bytes_consumed;
169   }
170 
171   switch (state->state) {
172     case CONNECTION_STATE_WAITING_FOR_HEADER:
173       if (D_8(state->inbound_buffer, 0) == AMQP_PSEUDOFRAME_PROTOCOL_HEADER &&
174 	  D_16(state->inbound_buffer, 1) == AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL)
175       {
176 	state->target_size = 8;
177 	state->state = CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER;
178       } else {
179 	state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE;
180 	state->state = CONNECTION_STATE_WAITING_FOR_BODY;
181       }
182 
183       /* Wind buffer forward, and try to read some body out of it. */
184       received_data.len -= bytes_consumed;
185       received_data.bytes = ((char *) received_data.bytes) + bytes_consumed;
186       goto read_more;
187 
188     case CONNECTION_STATE_WAITING_FOR_BODY: {
189       int frame_type = D_8(state->inbound_buffer, 0);
190 
191 #if 0
192       printf("recving:\n");
193       amqp_dump(state->inbound_buffer.bytes, state->target_size);
194 #endif
195 
196       /* Check frame end marker (footer) */
197       if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) {
198 	return -EINVAL;
199       }
200 
201       decoded_frame->channel = D_16(state->inbound_buffer, 1);
202 
203       switch (frame_type) {
204 	case AMQP_FRAME_METHOD: {
205 	  amqp_bytes_t encoded;
206 
207 	  /* Four bytes of method ID before the method args. */
208 	  encoded.len = state->target_size - (HEADER_SIZE + 4 + FOOTER_SIZE);
209 	  encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 4, encoded.len);
210 
211 	  decoded_frame->frame_type = AMQP_FRAME_METHOD;
212 	  decoded_frame->payload.method.id = D_32(state->inbound_buffer, HEADER_SIZE);
213 	  AMQP_CHECK_RESULT(amqp_decode_method(decoded_frame->payload.method.id,
214 					       &state->decoding_pool,
215 					       encoded,
216 					       &decoded_frame->payload.method.decoded));
217 	  break;
218 	}
219 
220 	case AMQP_FRAME_HEADER: {
221 	  amqp_bytes_t encoded;
222 
223 	  /* 12 bytes for properties header. */
224 	  encoded.len = state->target_size - (HEADER_SIZE + 12 + FOOTER_SIZE);
225 	  encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 12, encoded.len);
226 
227 	  decoded_frame->frame_type = AMQP_FRAME_HEADER;
228 	  decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, HEADER_SIZE);
229 	  decoded_frame->payload.properties.body_size = D_64(state->inbound_buffer, HEADER_SIZE+4);
230 	  AMQP_CHECK_RESULT(amqp_decode_properties(decoded_frame->payload.properties.class_id,
231 						   &state->decoding_pool,
232 						   encoded,
233 						   &decoded_frame->payload.properties.decoded));
234 	  break;
235 	}
236 
237 	case AMQP_FRAME_BODY: {
238 	  size_t fragment_len = state->target_size - (HEADER_SIZE + FOOTER_SIZE);
239 
240 	  decoded_frame->frame_type = AMQP_FRAME_BODY;
241 	  decoded_frame->payload.body_fragment.len = fragment_len;
242 	  decoded_frame->payload.body_fragment.bytes =
243 	    D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len);
244 	  break;
245 	}
246 
247 	case AMQP_FRAME_HEARTBEAT:
248 	  decoded_frame->frame_type = AMQP_FRAME_HEARTBEAT;
249 	  break;
250 
251 	default:
252 	  /* Ignore the frame by not changing frame_type away from 0. */
253 	  break;
254       }
255 
256       return_to_idle(state);
257 
258       if(decoded_frame->frame_type == AMQP_FRAME_METHOD &&
259          decoded_frame->payload.method.id == AMQP_BASIC_RETURN_METHOD) {
260         amqp_basic_return_t *m = decoded_frame->payload.method.decoded;
261         if(state->basic_return_callback)
262           state->basic_return_callback(decoded_frame->channel, m,
263                                        state->basic_return_callback_data);
264       }
265 
266       return total_bytes_consumed;
267     }
268 
269     case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER:
270       decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER;
271       decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL;
272       amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P',
273 		  "Invalid protocol header received");
274       decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4);
275       decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5);
276       decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6);
277       decoded_frame->payload.protocol_header.protocol_version_minor = D_8(state->inbound_buffer, 7);
278 
279       return_to_idle(state);
280       return total_bytes_consumed;
281 
282     default:
283       amqp_assert(0, "Internal error: invalid amqp_connection_state_t->state %d", state->state);
284   }
285 }
286 
amqp_release_buffers_ok(amqp_connection_state_t state)287 amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) {
288   return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL);
289 }
290 
amqp_release_buffers(amqp_connection_state_t state)291 void amqp_release_buffers(amqp_connection_state_t state) {
292   ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
293 
294   amqp_assert(state->first_queued_frame == NULL,
295 	      "Programming error: attempt to amqp_release_buffers while waiting events enqueued");
296 
297   recycle_amqp_pool(&state->frame_pool);
298   recycle_amqp_pool(&state->decoding_pool);
299 }
300 
amqp_maybe_release_buffers(amqp_connection_state_t state)301 void amqp_maybe_release_buffers(amqp_connection_state_t state) {
302   if (amqp_release_buffers_ok(state)) {
303     amqp_release_buffers(state);
304   }
305 }
306 
inner_send_frame(amqp_connection_state_t state,amqp_frame_t const * frame,amqp_bytes_t * encoded,int * payload_len)307 static int inner_send_frame(amqp_connection_state_t state,
308 			    amqp_frame_t const *frame,
309 			    amqp_bytes_t *encoded,
310 			    int *payload_len)
311 {
312   int separate_body;
313 
314   E_8(state->outbound_buffer, 0, frame->frame_type);
315   E_16(state->outbound_buffer, 1, frame->channel);
316   switch (frame->frame_type) {
317     case AMQP_FRAME_METHOD:
318       E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id);
319       encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE);
320       encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded->len);
321       *payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id,
322 							  frame->payload.method.decoded,
323 							  *encoded)) + 4;
324       separate_body = 0;
325       break;
326 
327     case AMQP_FRAME_HEADER:
328       E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id);
329       E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */
330       E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size);
331       encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE);
332       encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded->len);
333       *payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id,
334 							      frame->payload.properties.decoded,
335 							      *encoded)) + 12;
336       separate_body = 0;
337       break;
338 
339     case AMQP_FRAME_BODY:
340       *encoded = frame->payload.body_fragment;
341       *payload_len = encoded->len;
342       separate_body = 1;
343       break;
344 
345     case AMQP_FRAME_HEARTBEAT:
346       *encoded = AMQP_EMPTY_BYTES;
347       *payload_len = 0;
348       separate_body = 0;
349       break;
350 
351     default:
352       return -EINVAL;
353   }
354 
355   E_32(state->outbound_buffer, 3, *payload_len);
356   if (!separate_body) {
357     E_8(state->outbound_buffer, *payload_len + HEADER_SIZE, AMQP_FRAME_END);
358   }
359 
360 #if 0
361   if (separate_body) {
362     printf("sending body frame (header):\n");
363     amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE);
364     printf("sending body frame (payload):\n");
365     amqp_dump(encoded->bytes, *payload_len);
366   } else {
367     printf("sending:\n");
368     amqp_dump(state->outbound_buffer.bytes, *payload_len + HEADER_SIZE + FOOTER_SIZE);
369   }
370 #endif
371 
372   if(state->heartbeat) gettimeofday(&state->last_send, NULL);
373   return separate_body;
374 }
375 
amqp_send_frame(amqp_connection_state_t state,amqp_frame_t const * frame)376 int amqp_send_frame(amqp_connection_state_t state,
377 		    amqp_frame_t const *frame)
378 {
379   amqp_bytes_t encoded;
380   int payload_len;
381   int separate_body;
382 
383   separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
384   switch (separate_body) {
385     case 0:
386       AMQP_CHECK_RESULT(write(state->sockfd,
387 			      state->outbound_buffer.bytes,
388 			      payload_len + (HEADER_SIZE + FOOTER_SIZE)));
389       return 0;
390 
391     case 1:
392       AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE));
393       AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len));
394       {
395 	unsigned char frame_end_byte = AMQP_FRAME_END;
396 	assert(FOOTER_SIZE == 1);
397 	AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE));
398       }
399       return 0;
400 
401     default:
402       return separate_body;
403   }
404 }
405 
amqp_send_frame_to(amqp_connection_state_t state,amqp_frame_t const * frame,amqp_output_fn_t fn,void * context)406 int amqp_send_frame_to(amqp_connection_state_t state,
407 		       amqp_frame_t const *frame,
408 		       amqp_output_fn_t fn,
409 		       void *context)
410 {
411   amqp_bytes_t encoded;
412   int payload_len;
413   int separate_body;
414 
415   separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
416   switch (separate_body) {
417     case 0:
418       AMQP_CHECK_RESULT(fn(context,
419 			   state->outbound_buffer.bytes,
420 			   payload_len + (HEADER_SIZE + FOOTER_SIZE)));
421       return 0;
422 
423     case 1:
424       AMQP_CHECK_RESULT(fn(context, state->outbound_buffer.bytes, HEADER_SIZE));
425       AMQP_CHECK_RESULT(fn(context, encoded.bytes, payload_len));
426       {
427 	unsigned char frame_end_byte = AMQP_FRAME_END;
428 	assert(FOOTER_SIZE == 1);
429 	AMQP_CHECK_RESULT(fn(context, &frame_end_byte, FOOTER_SIZE));
430       }
431       return 0;
432 
433     default:
434       return separate_body;
435   }
436 }
437