1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <string.h>
4 #include <stdint.h>
5 #include <errno.h>
6
7 #include <unistd.h>
8 #include <sys/uio.h>
9 #include <sys/time.h>
10
11 #include "amqp.h"
12 #include "amqp_framing.h"
13 #include "amqp_private.h"
14
15 #include <assert.h>
16
17 #define INITIAL_FRAME_POOL_PAGE_SIZE 65536
18 #define INITIAL_DECODING_POOL_PAGE_SIZE 131072
19 #define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
20
21 #define ENFORCE_STATE(statevec, statenum) \
22 { \
23 amqp_connection_state_t _check_state = (statevec); \
24 int _wanted_state = (statenum); \
25 amqp_assert(_check_state->state == _wanted_state, \
26 "Programming error: invalid AMQP connection state: expected %d, got %d", \
27 _wanted_state, \
28 _check_state->state); \
29 }
30
amqp_new_connection(void)31 amqp_connection_state_t amqp_new_connection(void) {
32 amqp_connection_state_t state =
33 (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
34
35 if (state == NULL) {
36 return NULL;
37 }
38
39 init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
40 init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
41
42 state->state = CONNECTION_STATE_IDLE;
43
44 state->inbound_buffer.bytes = NULL;
45 state->outbound_buffer.bytes = NULL;
46 if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) {
47 empty_amqp_pool(&state->frame_pool);
48 empty_amqp_pool(&state->decoding_pool);
49 free(state);
50 return NULL;
51 }
52
53 state->inbound_offset = 0;
54 state->target_size = HEADER_SIZE;
55
56 state->sockfd = -1;
57 state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
58 state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
59 if (state->sock_inbound_buffer.bytes == NULL) {
60 amqp_destroy_connection(state);
61 return NULL;
62 }
63
64 state->sock_inbound_offset = 0;
65 state->sock_inbound_limit = 0;
66
67 state->first_queued_frame = NULL;
68 state->last_queued_frame = NULL;
69
70 return state;
71 }
72
amqp_get_sockfd(amqp_connection_state_t state)73 int amqp_get_sockfd(amqp_connection_state_t state) {
74 return state->sockfd;
75 }
76
amqp_set_sockfd(amqp_connection_state_t state,int sockfd)77 void amqp_set_sockfd(amqp_connection_state_t state,
78 int sockfd)
79 {
80 state->sockfd = sockfd;
81 }
82
amqp_tune_connection(amqp_connection_state_t state,int channel_max,int frame_max,int heartbeat)83 int amqp_tune_connection(amqp_connection_state_t state,
84 int channel_max,
85 int frame_max,
86 int heartbeat)
87 {
88 void *newbuf;
89
90 ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
91
92 state->channel_max = channel_max;
93 state->frame_max = frame_max;
94 state->heartbeat = heartbeat;
95
96 empty_amqp_pool(&state->frame_pool);
97 init_amqp_pool(&state->frame_pool, frame_max);
98
99 state->inbound_buffer.len = frame_max;
100 state->outbound_buffer.len = frame_max;
101 newbuf = realloc(state->outbound_buffer.bytes, frame_max);
102 if (newbuf == NULL) {
103 amqp_destroy_connection(state);
104 return -ENOMEM;
105 }
106 state->outbound_buffer.bytes = newbuf;
107
108 return 0;
109 }
110
amqp_get_channel_max(amqp_connection_state_t state)111 int amqp_get_channel_max(amqp_connection_state_t state) {
112 return state->channel_max;
113 }
114
amqp_destroy_connection(amqp_connection_state_t state)115 void amqp_destroy_connection(amqp_connection_state_t state) {
116 empty_amqp_pool(&state->frame_pool);
117 empty_amqp_pool(&state->decoding_pool);
118 free(state->outbound_buffer.bytes);
119 free(state->sock_inbound_buffer.bytes);
120 free(state);
121 }
122
return_to_idle(amqp_connection_state_t state)123 static void return_to_idle(amqp_connection_state_t state) {
124 state->inbound_buffer.bytes = NULL;
125 state->inbound_offset = 0;
126 state->target_size = HEADER_SIZE;
127 state->state = CONNECTION_STATE_IDLE;
128 }
129
amqp_set_basic_return_cb(amqp_connection_state_t state,amqp_basic_return_fn_t f,void * data)130 void amqp_set_basic_return_cb(amqp_connection_state_t state,
131 amqp_basic_return_fn_t f, void *data) {
132 state->basic_return_callback = f;
133 state->basic_return_callback_data = data;
134 }
amqp_handle_input(amqp_connection_state_t state,amqp_bytes_t received_data,amqp_frame_t * decoded_frame)135 int amqp_handle_input(amqp_connection_state_t state,
136 amqp_bytes_t received_data,
137 amqp_frame_t *decoded_frame)
138 {
139 int total_bytes_consumed = 0;
140 int bytes_consumed;
141
142 /* Returning frame_type of zero indicates either insufficient input,
143 or a complete, ignored frame was read. */
144 decoded_frame->frame_type = 0;
145
146 read_more:
147 if (received_data.len == 0) {
148 return total_bytes_consumed;
149 }
150
151 if (state->state == CONNECTION_STATE_IDLE) {
152 state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
153 state->state = CONNECTION_STATE_WAITING_FOR_HEADER;
154 }
155
156 bytes_consumed = state->target_size - state->inbound_offset;
157 if (received_data.len < bytes_consumed) {
158 bytes_consumed = received_data.len;
159 }
160
161 E_BYTES(state->inbound_buffer, state->inbound_offset, bytes_consumed, received_data.bytes);
162 state->inbound_offset += bytes_consumed;
163 total_bytes_consumed += bytes_consumed;
164
165 assert(state->inbound_offset <= state->target_size);
166
167 if (state->inbound_offset < state->target_size) {
168 return total_bytes_consumed;
169 }
170
171 switch (state->state) {
172 case CONNECTION_STATE_WAITING_FOR_HEADER:
173 if (D_8(state->inbound_buffer, 0) == AMQP_PSEUDOFRAME_PROTOCOL_HEADER &&
174 D_16(state->inbound_buffer, 1) == AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL)
175 {
176 state->target_size = 8;
177 state->state = CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER;
178 } else {
179 state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE;
180 state->state = CONNECTION_STATE_WAITING_FOR_BODY;
181 }
182
183 /* Wind buffer forward, and try to read some body out of it. */
184 received_data.len -= bytes_consumed;
185 received_data.bytes = ((char *) received_data.bytes) + bytes_consumed;
186 goto read_more;
187
188 case CONNECTION_STATE_WAITING_FOR_BODY: {
189 int frame_type = D_8(state->inbound_buffer, 0);
190
191 #if 0
192 printf("recving:\n");
193 amqp_dump(state->inbound_buffer.bytes, state->target_size);
194 #endif
195
196 /* Check frame end marker (footer) */
197 if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) {
198 return -EINVAL;
199 }
200
201 decoded_frame->channel = D_16(state->inbound_buffer, 1);
202
203 switch (frame_type) {
204 case AMQP_FRAME_METHOD: {
205 amqp_bytes_t encoded;
206
207 /* Four bytes of method ID before the method args. */
208 encoded.len = state->target_size - (HEADER_SIZE + 4 + FOOTER_SIZE);
209 encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 4, encoded.len);
210
211 decoded_frame->frame_type = AMQP_FRAME_METHOD;
212 decoded_frame->payload.method.id = D_32(state->inbound_buffer, HEADER_SIZE);
213 AMQP_CHECK_RESULT(amqp_decode_method(decoded_frame->payload.method.id,
214 &state->decoding_pool,
215 encoded,
216 &decoded_frame->payload.method.decoded));
217 break;
218 }
219
220 case AMQP_FRAME_HEADER: {
221 amqp_bytes_t encoded;
222
223 /* 12 bytes for properties header. */
224 encoded.len = state->target_size - (HEADER_SIZE + 12 + FOOTER_SIZE);
225 encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 12, encoded.len);
226
227 decoded_frame->frame_type = AMQP_FRAME_HEADER;
228 decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, HEADER_SIZE);
229 decoded_frame->payload.properties.body_size = D_64(state->inbound_buffer, HEADER_SIZE+4);
230 AMQP_CHECK_RESULT(amqp_decode_properties(decoded_frame->payload.properties.class_id,
231 &state->decoding_pool,
232 encoded,
233 &decoded_frame->payload.properties.decoded));
234 break;
235 }
236
237 case AMQP_FRAME_BODY: {
238 size_t fragment_len = state->target_size - (HEADER_SIZE + FOOTER_SIZE);
239
240 decoded_frame->frame_type = AMQP_FRAME_BODY;
241 decoded_frame->payload.body_fragment.len = fragment_len;
242 decoded_frame->payload.body_fragment.bytes =
243 D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len);
244 break;
245 }
246
247 case AMQP_FRAME_HEARTBEAT:
248 decoded_frame->frame_type = AMQP_FRAME_HEARTBEAT;
249 break;
250
251 default:
252 /* Ignore the frame by not changing frame_type away from 0. */
253 break;
254 }
255
256 return_to_idle(state);
257
258 if(decoded_frame->frame_type == AMQP_FRAME_METHOD &&
259 decoded_frame->payload.method.id == AMQP_BASIC_RETURN_METHOD) {
260 amqp_basic_return_t *m = decoded_frame->payload.method.decoded;
261 if(state->basic_return_callback)
262 state->basic_return_callback(decoded_frame->channel, m,
263 state->basic_return_callback_data);
264 }
265
266 return total_bytes_consumed;
267 }
268
269 case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER:
270 decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER;
271 decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL;
272 amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P',
273 "Invalid protocol header received");
274 decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4);
275 decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5);
276 decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6);
277 decoded_frame->payload.protocol_header.protocol_version_minor = D_8(state->inbound_buffer, 7);
278
279 return_to_idle(state);
280 return total_bytes_consumed;
281
282 default:
283 amqp_assert(0, "Internal error: invalid amqp_connection_state_t->state %d", state->state);
284 }
285 }
286
amqp_release_buffers_ok(amqp_connection_state_t state)287 amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) {
288 return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL);
289 }
290
amqp_release_buffers(amqp_connection_state_t state)291 void amqp_release_buffers(amqp_connection_state_t state) {
292 ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
293
294 amqp_assert(state->first_queued_frame == NULL,
295 "Programming error: attempt to amqp_release_buffers while waiting events enqueued");
296
297 recycle_amqp_pool(&state->frame_pool);
298 recycle_amqp_pool(&state->decoding_pool);
299 }
300
amqp_maybe_release_buffers(amqp_connection_state_t state)301 void amqp_maybe_release_buffers(amqp_connection_state_t state) {
302 if (amqp_release_buffers_ok(state)) {
303 amqp_release_buffers(state);
304 }
305 }
306
inner_send_frame(amqp_connection_state_t state,amqp_frame_t const * frame,amqp_bytes_t * encoded,int * payload_len)307 static int inner_send_frame(amqp_connection_state_t state,
308 amqp_frame_t const *frame,
309 amqp_bytes_t *encoded,
310 int *payload_len)
311 {
312 int separate_body;
313
314 E_8(state->outbound_buffer, 0, frame->frame_type);
315 E_16(state->outbound_buffer, 1, frame->channel);
316 switch (frame->frame_type) {
317 case AMQP_FRAME_METHOD:
318 E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id);
319 encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE);
320 encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded->len);
321 *payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id,
322 frame->payload.method.decoded,
323 *encoded)) + 4;
324 separate_body = 0;
325 break;
326
327 case AMQP_FRAME_HEADER:
328 E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id);
329 E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */
330 E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size);
331 encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE);
332 encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded->len);
333 *payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id,
334 frame->payload.properties.decoded,
335 *encoded)) + 12;
336 separate_body = 0;
337 break;
338
339 case AMQP_FRAME_BODY:
340 *encoded = frame->payload.body_fragment;
341 *payload_len = encoded->len;
342 separate_body = 1;
343 break;
344
345 case AMQP_FRAME_HEARTBEAT:
346 *encoded = AMQP_EMPTY_BYTES;
347 *payload_len = 0;
348 separate_body = 0;
349 break;
350
351 default:
352 return -EINVAL;
353 }
354
355 E_32(state->outbound_buffer, 3, *payload_len);
356 if (!separate_body) {
357 E_8(state->outbound_buffer, *payload_len + HEADER_SIZE, AMQP_FRAME_END);
358 }
359
360 #if 0
361 if (separate_body) {
362 printf("sending body frame (header):\n");
363 amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE);
364 printf("sending body frame (payload):\n");
365 amqp_dump(encoded->bytes, *payload_len);
366 } else {
367 printf("sending:\n");
368 amqp_dump(state->outbound_buffer.bytes, *payload_len + HEADER_SIZE + FOOTER_SIZE);
369 }
370 #endif
371
372 if(state->heartbeat) gettimeofday(&state->last_send, NULL);
373 return separate_body;
374 }
375
amqp_send_frame(amqp_connection_state_t state,amqp_frame_t const * frame)376 int amqp_send_frame(amqp_connection_state_t state,
377 amqp_frame_t const *frame)
378 {
379 amqp_bytes_t encoded;
380 int payload_len;
381 int separate_body;
382
383 separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
384 switch (separate_body) {
385 case 0:
386 AMQP_CHECK_RESULT(write(state->sockfd,
387 state->outbound_buffer.bytes,
388 payload_len + (HEADER_SIZE + FOOTER_SIZE)));
389 return 0;
390
391 case 1:
392 AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE));
393 AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len));
394 {
395 unsigned char frame_end_byte = AMQP_FRAME_END;
396 assert(FOOTER_SIZE == 1);
397 AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE));
398 }
399 return 0;
400
401 default:
402 return separate_body;
403 }
404 }
405
amqp_send_frame_to(amqp_connection_state_t state,amqp_frame_t const * frame,amqp_output_fn_t fn,void * context)406 int amqp_send_frame_to(amqp_connection_state_t state,
407 amqp_frame_t const *frame,
408 amqp_output_fn_t fn,
409 void *context)
410 {
411 amqp_bytes_t encoded;
412 int payload_len;
413 int separate_body;
414
415 separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
416 switch (separate_body) {
417 case 0:
418 AMQP_CHECK_RESULT(fn(context,
419 state->outbound_buffer.bytes,
420 payload_len + (HEADER_SIZE + FOOTER_SIZE)));
421 return 0;
422
423 case 1:
424 AMQP_CHECK_RESULT(fn(context, state->outbound_buffer.bytes, HEADER_SIZE));
425 AMQP_CHECK_RESULT(fn(context, encoded.bytes, payload_len));
426 {
427 unsigned char frame_end_byte = AMQP_FRAME_END;
428 assert(FOOTER_SIZE == 1);
429 AMQP_CHECK_RESULT(fn(context, &frame_end_byte, FOOTER_SIZE));
430 }
431 return 0;
432
433 default:
434 return separate_body;
435 }
436 }
437