1 /*
2 +----------------------------------------------------------------------+
3 | PHP Version 5 |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 1997-2007 The PHP Group |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
15 | Author: Alexandre Kalendarev akalend@mail.ru Copyright (c) 2009-2010 |
16 | Lead: |
17 | - Pieter de Zwart |
18 | Maintainers: |
19 | - Brad Rodriguez |
20 | - Jonathan Tansavatdi |
21 +----------------------------------------------------------------------+
22 */
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include "php.h"
28 #include "php_ini.h"
29 #include "ext/standard/info.h"
30 #include "zend_exceptions.h"
31
32 #ifdef PHP_WIN32
33 # include "win32/php_stdint.h"
34 # include "win32/signal.h"
35 #else
36 # include <stdint.h>
37 # include <signal.h>
38 #endif
39
40 #include <amqp.h>
41 #include <amqp_framing.h>
42
43 #ifdef PHP_WIN32
44 # include "win32/unistd.h"
45 #else
46 # include <unistd.h>
47 #endif
48
49 #include "php_amqp.h"
50 #include "amqp_connection.h"
51 #include "amqp_methods_handling.h"
52 #include "amqp_connection_resource.h"
53 #include "amqp_channel.h"
54
55 zend_class_entry *amqp_channel_class_entry;
56 #define this_ce amqp_channel_class_entry
57
58 zend_object_handlers amqp_channel_object_handlers;
59
php_amqp_close_channel(amqp_channel_resource * channel_resource,zend_bool check_errors TSRMLS_DC)60 void php_amqp_close_channel(amqp_channel_resource *channel_resource, zend_bool check_errors TSRMLS_DC)
61 {
62 assert(channel_resource != NULL);
63
64 amqp_connection_resource *connection_resource = channel_resource->connection_resource;
65
66 if (connection_resource != NULL) {
67 /* First, remove it from active channels table to prevent recursion in case of connection error */
68 php_amqp_connection_resource_unregister_channel(connection_resource, channel_resource->channel_id);
69 } else {
70 channel_resource->is_connected = '\0';
71 }
72
73 assert(channel_resource->connection_resource == NULL);
74
75 if (!channel_resource->is_connected) {
76 /* Nothing to do more - channel was previously marked as closed, possibly, due to channel-level error */
77 return;
78 }
79
80 channel_resource->is_connected = '\0';
81
82 if (connection_resource && connection_resource->is_connected && channel_resource->channel_id > 0) {
83 assert(connection_resource != NULL);
84
85 amqp_channel_close(connection_resource->connection_state, channel_resource->channel_id, AMQP_REPLY_SUCCESS);
86
87 amqp_rpc_reply_t res = amqp_get_rpc_reply(connection_resource->connection_state);
88
89 if (check_errors && PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
90 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
91 return;
92 }
93
94 php_amqp_maybe_release_buffers_on_channel(connection_resource, channel_resource);
95 }
96 }
97
98 #if PHP_MAJOR_VERSION >= 7
99
php_amqp_destroy_fci(zend_fcall_info * fci)100 static void php_amqp_destroy_fci(zend_fcall_info *fci) {
101 if (fci->size > 0) {
102 zval_ptr_dtor(&fci->function_name);
103 if (fci->object != NULL) {
104 #if PHP_VERSION_ID >= 70300
105 GC_DELREF(fci->object);
106 #else
107 GC_REFCOUNT(fci->object)--;
108 #endif
109 }
110 fci->size = 0;
111 }
112 }
113
php_amqp_duplicate_fci(zend_fcall_info * source)114 static void php_amqp_duplicate_fci(zend_fcall_info *source) {
115 if (source->size > 0) {
116
117 zval_add_ref(&source->function_name);
118 if (source->object != NULL) {
119 #if PHP_VERSION_ID >= 70300
120 GC_ADDREF(source->object);
121 #else
122 GC_REFCOUNT(source->object)++;
123 #endif
124 }
125 }
126 }
127
php_amqp_get_fci_gc_data_count(zend_fcall_info * fci)128 static int php_amqp_get_fci_gc_data_count(zend_fcall_info *fci) {
129 int cnt = 0;
130
131 if (fci->size > 0) {
132 cnt ++;
133
134 if (fci->object != NULL) {
135 cnt++;
136 }
137 }
138
139 return cnt;
140 }
141
php_amqp_get_fci_gc_data(zend_fcall_info * fci,zval * gc_data)142 static zval * php_amqp_get_fci_gc_data(zend_fcall_info *fci, zval *gc_data) {
143 if (ZEND_FCI_INITIALIZED(*fci)) {
144
145 ZVAL_COPY_VALUE(gc_data++, &fci->function_name);
146
147 if (fci->object != NULL) {
148 ZVAL_OBJ(gc_data++, fci->object);
149 }
150 }
151
152 return gc_data;
153 }
154
amqp_channel_gc(zval * object,zval ** table,int * n)155 static HashTable *amqp_channel_gc(zval *object, zval **table, int *n) /* {{{ */
156 {
157 amqp_channel_object *channel = PHP_AMQP_GET_CHANNEL(object);
158
159 int basic_return_cnt = php_amqp_get_fci_gc_data_count(&channel->callbacks.basic_return.fci);
160 int basic_ack_cnt = php_amqp_get_fci_gc_data_count(&channel->callbacks.basic_ack.fci);
161 int basic_nack_cnt = php_amqp_get_fci_gc_data_count(&channel->callbacks.basic_nack.fci);
162
163 int cnt = basic_return_cnt + basic_ack_cnt + basic_nack_cnt;
164
165 if (cnt > channel->gc_data_count) {
166 channel->gc_data_count = cnt;
167 channel->gc_data = (zval *) erealloc(channel->gc_data, sizeof(zval) * cnt);
168 }
169
170 zval *gc_data = channel->gc_data;
171
172 gc_data = php_amqp_get_fci_gc_data(&channel->callbacks.basic_return.fci, gc_data);
173 gc_data = php_amqp_get_fci_gc_data(&channel->callbacks.basic_ack.fci, gc_data);
174 php_amqp_get_fci_gc_data(&channel->callbacks.basic_nack.fci, gc_data);
175
176 *table = channel->gc_data;
177 *n = cnt;
178
179 return zend_std_get_properties(object TSRMLS_CC);
180 } /* }}} */
181
182 #else
php_amqp_destroy_fci(zend_fcall_info * fci)183 static void php_amqp_destroy_fci(zend_fcall_info *fci) {
184 if (fci->size > 0) {
185 zval_ptr_dtor(&fci->function_name);
186 if (fci->object_ptr != NULL) {
187 zval_ptr_dtor(&fci->object_ptr);
188 }
189 fci->size = 0;
190 }
191 }
192
php_amqp_duplicate_fci(zend_fcall_info * source)193 static void php_amqp_duplicate_fci(zend_fcall_info *source) {
194 if (source->size > 0) {
195
196 zval_add_ref(&source->function_name);
197 if (source->object_ptr != NULL) {
198 zval_add_ref(&source->object_ptr);
199 }
200 }
201 }
202
php_amqp_get_fci_gc_data_count(zend_fcall_info * fci)203 static int php_amqp_get_fci_gc_data_count(zend_fcall_info *fci) {
204 int cnt = 0;
205
206 if (fci->size > 0) {
207 cnt ++;
208
209 if (fci->object_ptr != NULL) {
210 cnt++;
211 }
212 }
213
214 return cnt;
215 }
216
php_amqp_get_fci_gc_data(zend_fcall_info * fci,zval ** gc_data,int offset)217 static int php_amqp_get_fci_gc_data(zend_fcall_info *fci, zval **gc_data, int offset) {
218
219 if (ZEND_FCI_INITIALIZED(*fci)) {
220 gc_data[offset++] = fci->function_name;
221
222 if (fci->object_ptr != NULL) {
223 gc_data[offset++] = fci->object_ptr;
224 }
225 }
226
227 return offset;
228 }
229
amqp_channel_gc(zval * object,zval *** table,int * n TSRMLS_DC)230 static HashTable *amqp_channel_gc(zval *object, zval ***table, int *n TSRMLS_DC) /* {{{ */
231 {
232 amqp_channel_object *channel = PHP_AMQP_GET_CHANNEL(object);
233
234 int basic_return_cnt = php_amqp_get_fci_gc_data_count(&channel->callbacks.basic_return.fci);
235 int basic_ack_cnt = php_amqp_get_fci_gc_data_count(&channel->callbacks.basic_ack.fci);
236 int basic_nack_cnt = php_amqp_get_fci_gc_data_count(&channel->callbacks.basic_nack.fci);
237
238 int cnt = basic_return_cnt + basic_ack_cnt + basic_nack_cnt;
239
240 if (cnt > channel->gc_data_count) {
241 channel->gc_data_count = cnt;
242 channel->gc_data = (zval **) erealloc(channel->gc_data, sizeof(zval *) * channel->gc_data_count);
243 }
244
245 php_amqp_get_fci_gc_data(&channel->callbacks.basic_return.fci, channel->gc_data, 0);
246 php_amqp_get_fci_gc_data(&channel->callbacks.basic_ack.fci, channel->gc_data, basic_return_cnt);
247 php_amqp_get_fci_gc_data(&channel->callbacks.basic_nack.fci, channel->gc_data, basic_return_cnt + basic_ack_cnt);
248
249 *table = channel->gc_data;
250 *n = cnt;
251
252 return zend_std_get_properties(object TSRMLS_CC);
253 } /* }}} */
254
255 #endif
256
php_amqp_clean_callbacks(amqp_channel_callbacks * callbacks)257 static void php_amqp_clean_callbacks(amqp_channel_callbacks *callbacks) {
258 php_amqp_destroy_fci(&callbacks->basic_return.fci);
259 php_amqp_destroy_fci(&callbacks->basic_ack.fci);
260 php_amqp_destroy_fci(&callbacks->basic_nack.fci);
261 }
262
263
amqp_channel_free(PHP5to7_obj_free_zend_object * object TSRMLS_DC)264 void amqp_channel_free(PHP5to7_obj_free_zend_object *object TSRMLS_DC)
265 {
266 amqp_channel_object *channel = PHP_AMQP_FETCH_CHANNEL(object);
267
268 if (channel->channel_resource != NULL) {
269 php_amqp_close_channel(channel->channel_resource, 0 TSRMLS_CC);
270
271 efree(channel->channel_resource);
272 channel->channel_resource = NULL;
273 }
274
275 if (channel->gc_data) {
276 efree(channel->gc_data);
277 }
278
279 php_amqp_clean_callbacks(&channel->callbacks);
280
281 zend_object_std_dtor(&channel->zo TSRMLS_CC);
282
283 #if PHP_MAJOR_VERSION < 7
284 if (channel->this_ptr) {
285 channel->this_ptr = NULL;
286 }
287
288 efree(object);
289 #endif
290 }
291
292
amqp_channel_ctor(zend_class_entry * ce TSRMLS_DC)293 PHP5to7_zend_object_value amqp_channel_ctor(zend_class_entry *ce TSRMLS_DC)
294 {
295 amqp_channel_object *channel = PHP5to7_ECALLOC_CHANNEL_OBJECT(ce);
296
297 zend_object_std_init(&channel->zo, ce TSRMLS_CC);
298 AMQP_OBJECT_PROPERTIES_INIT(channel->zo, ce);
299
300 #if PHP_MAJOR_VERSION >=7
301 channel->zo.handlers = &amqp_channel_object_handlers;
302
303 return &channel->zo;
304 #else
305 PHP5to7_zend_object_value new_value;
306
307 new_value.handle = zend_objects_store_put(
308 channel,
309 NULL,
310 (zend_objects_free_object_storage_t) amqp_channel_free,
311 NULL TSRMLS_CC
312 );
313
314 new_value.handlers = zend_get_std_object_handlers();
315
316 return new_value;
317 #endif
318 }
319
320
321 /* {{{ proto AMQPChannel::__construct(AMQPConnection obj)
322 */
PHP_METHOD(amqp_channel_class,__construct)323 static PHP_METHOD(amqp_channel_class, __construct)
324 {
325 PHP5to7_READ_PROP_RV_PARAM_DECL;
326
327 zval *connection_object = NULL;
328
329 amqp_channel_resource *channel_resource;
330 amqp_channel_object *channel;
331 amqp_connection_object *connection;
332
333 /* Parse out the method parameters */
334 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "o", &connection_object) == FAILURE) {
335 zend_throw_exception(amqp_channel_exception_class_entry, "Parameter must be an instance of AMQPConnection.", 0 TSRMLS_CC);
336 RETURN_NULL();
337 }
338
339 PHP5to7_zval_t consumers PHP5to7_MAYBE_SET_TO_NULL;
340
341 PHP5to7_MAYBE_INIT(consumers);
342 PHP5to7_ARRAY_INIT(consumers);
343
344 zend_update_property(this_ce, getThis(), ZEND_STRL("consumers"), PHP5to7_MAYBE_PTR(consumers) TSRMLS_CC);
345
346 PHP5to7_MAYBE_DESTROY(consumers);
347
348 channel = PHP_AMQP_GET_CHANNEL(getThis());
349 #if PHP_MAJOR_VERSION < 7
350 channel->this_ptr = getThis();
351 #endif
352
353 /* Set the prefetch count */
354 zend_update_property_long(this_ce, getThis(), ZEND_STRL("prefetch_count"), INI_INT("amqp.prefetch_count") TSRMLS_CC);
355
356 /* Set the prefetch size */
357 zend_update_property_long(this_ce, getThis(), ZEND_STRL("prefetch_size"), INI_INT("amqp.prefetch_size") TSRMLS_CC);
358
359 /* Set the global prefetch count */
360 zend_update_property_long(this_ce, getThis(), ZEND_STRL("global_prefetch_count"), INI_INT("amqp.global_prefetch_count") TSRMLS_CC);
361
362 /* Set the global prefetch size */
363 zend_update_property_long(this_ce, getThis(), ZEND_STRL("global_prefetch_size"), INI_INT("amqp.global_prefetch_size") TSRMLS_CC);
364
365 /* Pull out and verify the connection */
366 connection = PHP_AMQP_GET_CONNECTION(connection_object);
367 PHP_AMQP_VERIFY_CONNECTION(connection, "Could not create channel.");
368
369 if (!connection->connection_resource) {
370 zend_throw_exception(amqp_channel_exception_class_entry, "Could not create channel. No connection resource.", 0 TSRMLS_CC);
371 return;
372 }
373
374 if (!connection->connection_resource->is_connected) {
375 zend_throw_exception(amqp_channel_exception_class_entry, "Could not create channel. Connection resource is not connected.", 0 TSRMLS_CC);
376 return;
377 }
378
379 zend_update_property(this_ce, getThis(), ZEND_STRL("connection"), connection_object TSRMLS_CC);
380
381 channel_resource = (amqp_channel_resource*)ecalloc(1, sizeof(amqp_channel_resource));
382 channel->channel_resource = channel_resource;
383 channel_resource->parent = channel;
384
385 /* Figure out what the next available channel is on this connection */
386 channel_resource->channel_id = php_amqp_connection_resource_get_available_channel_id(connection->connection_resource);
387
388 /* Check that we got a valid channel */
389 if (!channel_resource->channel_id) {
390 zend_throw_exception(amqp_channel_exception_class_entry, "Could not create channel. Connection has no open channel slots remaining.", 0 TSRMLS_CC);
391 return;
392 }
393
394 if (php_amqp_connection_resource_register_channel(connection->connection_resource, channel_resource, channel_resource->channel_id) == FAILURE) {
395 zend_throw_exception(amqp_channel_exception_class_entry, "Could not create channel. Failed to add channel to connection slot.", 0 TSRMLS_CC);
396 }
397
398 /* Open up the channel for use */
399 amqp_channel_open_ok_t *r = amqp_channel_open(channel_resource->connection_resource->connection_state, channel_resource->channel_id);
400
401
402 if (!r) {
403 amqp_rpc_reply_t res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
404
405 php_amqp_error(res, &PHP_AMQP_G(error_message), channel_resource->connection_resource, channel_resource TSRMLS_CC);
406
407 php_amqp_zend_throw_exception(res, amqp_channel_exception_class_entry, PHP_AMQP_G(error_message), PHP_AMQP_G(error_code) TSRMLS_CC);
408 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
409
410 /* Prevent double free, it may happens in case case channel resource was already freed due to some hard error. */
411 if (channel_resource->connection_resource) {
412 php_amqp_connection_resource_unregister_channel(channel_resource->connection_resource, channel_resource->channel_id);
413 channel_resource->channel_id = 0;
414 }
415
416 return;
417 }
418
419 /* r->channel_id is a 16-bit channel number insibe amqp_bytes_t, assertion below will without converting to uint16_t*/
420 /* assert (r->channel_id == channel_resource->channel_id);*/
421 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
422
423 channel_resource->is_connected = '\1';
424
425 /* Set the prefetch count: */
426 amqp_basic_qos(
427 channel_resource->connection_resource->connection_state,
428 channel_resource->channel_id,
429 0, /* prefetch window size */
430 (uint16_t)PHP_AMQP_READ_THIS_PROP_LONG("prefetch_count"), /* prefetch message count */
431 /* NOTE that RabbitMQ has reinterpreted global flag field. See https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.global for details */
432 0 /* global flag */
433 );
434
435 amqp_rpc_reply_t res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
436
437 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
438 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
439 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
440 return;
441 }
442
443 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
444
445 uint16_t global_prefetch_size = (uint16_t)PHP_AMQP_READ_THIS_PROP_LONG("global_prefetch_size");
446 uint16_t global_prefetch_count = (uint16_t)PHP_AMQP_READ_THIS_PROP_LONG("global_prefetch_count");
447
448 /* Set the global prefetch settings (ignoring if 0 for backwards compatibility) */
449 if (global_prefetch_size != 0 || global_prefetch_count != 0) {
450 amqp_basic_qos(
451 channel_resource->connection_resource->connection_state,
452 channel_resource->channel_id,
453 global_prefetch_size,
454 global_prefetch_count,
455 1
456 );
457
458 res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
459
460 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
461 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
462 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
463 return;
464 }
465
466 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
467 }
468 }
469 /* }}} */
470
471
472 /* {{{ proto bool amqp::isConnected()
473 check amqp channel */
PHP_METHOD(amqp_channel_class,isConnected)474 static PHP_METHOD(amqp_channel_class, isConnected)
475 {
476 amqp_channel_resource *channel_resource;
477
478 PHP_AMQP_NOPARAMS();
479
480 channel_resource = PHP_AMQP_GET_CHANNEL_RESOURCE(getThis());
481
482 RETURN_BOOL(channel_resource && channel_resource->is_connected);
483 }
484 /* }}} */
485
486 /* {{{ proto bool AMQPChannel::close()
487 Close amqp channel */
PHP_METHOD(amqp_channel_class,close)488 static PHP_METHOD(amqp_channel_class, close)
489 {
490 amqp_channel_resource *channel_resource;
491
492 PHP_AMQP_NOPARAMS();
493
494 channel_resource = PHP_AMQP_GET_CHANNEL_RESOURCE(getThis());
495
496 if(channel_resource && channel_resource->is_connected) {
497 php_amqp_close_channel(channel_resource, 1 TSRMLS_CC);
498 }
499 }
500 /* }}} */
501
502 /* {{{ proto bool amqp::getChannelId()
503 get amqp channel ID */
PHP_METHOD(amqp_channel_class,getChannelId)504 static PHP_METHOD(amqp_channel_class, getChannelId)
505 {
506 amqp_channel_resource *channel_resource;
507
508 PHP_AMQP_NOPARAMS();
509
510 channel_resource = PHP_AMQP_GET_CHANNEL_RESOURCE(getThis());
511
512 if (!channel_resource) {
513 RETURN_NULL();
514 }
515
516 RETURN_LONG(channel_resource->channel_id);
517 }
518 /* }}} */
519
520 /* {{{ proto bool amqp::setPrefetchCount(long count)
521 set the number of prefetches */
PHP_METHOD(amqp_channel_class,setPrefetchCount)522 static PHP_METHOD(amqp_channel_class, setPrefetchCount)
523 {
524 PHP5to7_READ_PROP_RV_PARAM_DECL;
525
526 amqp_channel_resource *channel_resource;
527 PHP5to7_param_long_type_t prefetch_count;
528
529 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &prefetch_count) == FAILURE) {
530 return;
531 }
532
533 channel_resource = PHP_AMQP_GET_CHANNEL_RESOURCE(getThis());
534 PHP_AMQP_VERIFY_CHANNEL_CONNECTION_RESOURCE(channel_resource, "Could not set prefetch count.");
535 // TODO: verify that connection is active and resource exists. that is enough
536
537 /* If we are already connected, set the new prefetch count */
538 if (channel_resource->is_connected) {
539 amqp_basic_qos(
540 channel_resource->connection_resource->connection_state,
541 channel_resource->channel_id,
542 0,
543 (uint16_t)prefetch_count,
544 0
545 );
546
547 amqp_rpc_reply_t res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
548
549 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
550 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
551 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
552 return;
553 }
554
555 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
556
557 uint16_t global_prefetch_size = (uint16_t)PHP_AMQP_READ_THIS_PROP_LONG("global_prefetch_size");
558 uint16_t global_prefetch_count = (uint16_t)PHP_AMQP_READ_THIS_PROP_LONG("global_prefetch_count");
559
560 /* Re-apply current global prefetch settings if set (writing consumer prefetch settings will clear global prefetch settings) */
561 if (global_prefetch_size != 0 || global_prefetch_count != 0) {
562 amqp_basic_qos(
563 channel_resource->connection_resource->connection_state,
564 channel_resource->channel_id,
565 global_prefetch_size,
566 global_prefetch_count,
567 1
568 );
569
570 res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
571
572 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
573 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
574 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
575 return;
576 }
577
578 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
579 }
580 }
581
582 /* Set the prefetch count - the implication is to disable the size */
583 zend_update_property_long(this_ce, getThis(), ZEND_STRL("prefetch_count"), prefetch_count TSRMLS_CC);
584 zend_update_property_long(this_ce, getThis(), ZEND_STRL("prefetch_size"), 0 TSRMLS_CC);
585
586 RETURN_TRUE;
587 }
588 /* }}} */
589
590 /* {{{ proto long amqp::getPrefetchCount()
591 get the number of prefetches */
PHP_METHOD(amqp_channel_class,getPrefetchCount)592 static PHP_METHOD(amqp_channel_class, getPrefetchCount)
593 {
594 PHP5to7_READ_PROP_RV_PARAM_DECL;
595 PHP_AMQP_NOPARAMS();
596 PHP_AMQP_RETURN_THIS_PROP("prefetch_count")
597 }
598 /* }}} */
599
600 /* {{{ proto bool amqp::setPrefetchSize(long size)
601 set the number of prefetches */
PHP_METHOD(amqp_channel_class,setPrefetchSize)602 static PHP_METHOD(amqp_channel_class, setPrefetchSize)
603 {
604 PHP5to7_READ_PROP_RV_PARAM_DECL;
605
606 amqp_channel_resource *channel_resource;
607 PHP5to7_param_long_type_t prefetch_size;
608
609 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &prefetch_size) == FAILURE) {
610 return;
611 }
612
613 channel_resource = PHP_AMQP_GET_CHANNEL_RESOURCE(getThis());
614 PHP_AMQP_VERIFY_CHANNEL_CONNECTION_RESOURCE(channel_resource, "Could not set prefetch size.");
615
616 /* If we are already connected, set the new prefetch count */
617 if (channel_resource->is_connected) {
618 amqp_basic_qos(
619 channel_resource->connection_resource->connection_state,
620 channel_resource->channel_id,
621 (uint16_t)prefetch_size,
622 0,
623 0
624 );
625
626 amqp_rpc_reply_t res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
627
628 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
629 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
630 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
631 return;
632 }
633
634 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
635
636 uint16_t global_prefetch_size = (uint16_t)PHP_AMQP_READ_THIS_PROP_LONG("global_prefetch_size");
637 uint16_t global_prefetch_count = (uint16_t)PHP_AMQP_READ_THIS_PROP_LONG("global_prefetch_count");
638
639 /* Re-apply current global prefetch settings if set (writing consumer prefetch settings will clear global prefetch settings) */
640 if (global_prefetch_size != 0 || global_prefetch_count != 0) {
641 amqp_basic_qos(
642 channel_resource->connection_resource->connection_state,
643 channel_resource->channel_id,
644 global_prefetch_size,
645 global_prefetch_count,
646 1
647 );
648
649 res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
650
651 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
652 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
653 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
654 return;
655 }
656
657 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
658 }
659 }
660
661 /* Set the prefetch size - the implication is to disable the count */
662 zend_update_property_long(this_ce, getThis(), ZEND_STRL("prefetch_count"), 0 TSRMLS_CC);
663 zend_update_property_long(this_ce, getThis(), ZEND_STRL("prefetch_size"), prefetch_size TSRMLS_CC);
664
665 RETURN_TRUE;
666 }
667 /* }}} */
668
669 /* {{{ proto long amqp::getPrefetchSize()
670 get the number of prefetches */
PHP_METHOD(amqp_channel_class,getPrefetchSize)671 static PHP_METHOD(amqp_channel_class, getPrefetchSize)
672 {
673 PHP5to7_READ_PROP_RV_PARAM_DECL;
674 PHP_AMQP_NOPARAMS();
675 PHP_AMQP_RETURN_THIS_PROP("prefetch_size")
676 }
677 /* }}} */
678
679 /* {{{ proto bool amqp::setGlobalPrefetchCount(long count)
680 set the number of prefetches */
PHP_METHOD(amqp_channel_class,setGlobalPrefetchCount)681 static PHP_METHOD(amqp_channel_class, setGlobalPrefetchCount)
682 {
683 PHP5to7_READ_PROP_RV_PARAM_DECL;
684
685 amqp_channel_resource *channel_resource;
686 PHP5to7_param_long_type_t global_prefetch_count;
687
688 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &global_prefetch_count) == FAILURE) {
689 return;
690 }
691
692 channel_resource = PHP_AMQP_GET_CHANNEL_RESOURCE(getThis());
693 PHP_AMQP_VERIFY_CHANNEL_CONNECTION_RESOURCE(channel_resource, "Could not set global prefetch count.");
694
695 /* If we are already connected, set the new prefetch count */
696 if (channel_resource->is_connected) {
697 /* Applying global prefetch settings retains existing consumer prefetch settings */
698 amqp_basic_qos(
699 channel_resource->connection_resource->connection_state,
700 channel_resource->channel_id,
701 0,
702 (uint16_t)global_prefetch_count,
703 1
704 );
705
706 amqp_rpc_reply_t res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
707
708 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
709 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
710 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
711 return;
712 }
713
714 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
715 }
716
717 /* Set the global prefetch count - the implication is to disable the size */
718 zend_update_property_long(this_ce, getThis(), ZEND_STRL("global_prefetch_count"), global_prefetch_count TSRMLS_CC);
719 zend_update_property_long(this_ce, getThis(), ZEND_STRL("global_prefetch_size"), 0 TSRMLS_CC);
720
721 RETURN_TRUE;
722 }
723 /* }}} */
724
725 /* {{{ proto long amqp::getGlobalPrefetchCount()
726 get the number of prefetches */
PHP_METHOD(amqp_channel_class,getGlobalPrefetchCount)727 static PHP_METHOD(amqp_channel_class, getGlobalPrefetchCount)
728 {
729 PHP5to7_READ_PROP_RV_PARAM_DECL;
730 PHP_AMQP_NOPARAMS();
731 PHP_AMQP_RETURN_THIS_PROP("global_prefetch_count")
732 }
733 /* }}} */
734
735 /* {{{ proto bool amqp::setGlobalPrefetchSize(long size)
736 set the number of prefetches */
PHP_METHOD(amqp_channel_class,setGlobalPrefetchSize)737 static PHP_METHOD(amqp_channel_class, setGlobalPrefetchSize)
738 {
739 PHP5to7_READ_PROP_RV_PARAM_DECL;
740
741 amqp_channel_resource *channel_resource;
742 PHP5to7_param_long_type_t global_prefetch_size;
743
744 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &global_prefetch_size) == FAILURE) {
745 return;
746 }
747
748 channel_resource = PHP_AMQP_GET_CHANNEL_RESOURCE(getThis());
749 PHP_AMQP_VERIFY_CHANNEL_CONNECTION_RESOURCE(channel_resource, "Could not set prefetch size.");
750
751 /* If we are already connected, set the new prefetch count */
752 if (channel_resource->is_connected) {
753 /* Applying global prefetch settings retains existing consumer prefetch settings */
754 amqp_basic_qos(
755 channel_resource->connection_resource->connection_state,
756 channel_resource->channel_id,
757 (uint16_t)global_prefetch_size,
758 0,
759 1
760 );
761
762 amqp_rpc_reply_t res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
763
764 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
765 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
766 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
767 return;
768 }
769
770 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
771 }
772
773 /* Set the global prefetch size - the implication is to disable the count */
774 zend_update_property_long(this_ce, getThis(), ZEND_STRL("global_prefetch_count"), 0 TSRMLS_CC);
775 zend_update_property_long(this_ce, getThis(), ZEND_STRL("global_prefetch_size"), global_prefetch_size TSRMLS_CC);
776
777 RETURN_TRUE;
778 }
779 /* }}} */
780
781 /* {{{ proto long amqp::getGlobalPrefetchSize()
782 get the number of prefetches */
PHP_METHOD(amqp_channel_class,getGlobalPrefetchSize)783 static PHP_METHOD(amqp_channel_class, getGlobalPrefetchSize)
784 {
785 PHP5to7_READ_PROP_RV_PARAM_DECL;
786 PHP_AMQP_NOPARAMS();
787 PHP_AMQP_RETURN_THIS_PROP("global_prefetch_size")
788 }
789 /* }}} */
790
791 /* {{{ proto amqp::qos(long size, long count, bool global)
792 set the number of prefetches */
PHP_METHOD(amqp_channel_class,qos)793 static PHP_METHOD(amqp_channel_class, qos)
794 {
795 PHP5to7_READ_PROP_RV_PARAM_DECL;
796
797 amqp_channel_resource *channel_resource;
798 PHP5to7_param_long_type_t prefetch_size;
799 PHP5to7_param_long_type_t prefetch_count;
800 zend_bool global = 0;
801
802 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll|b", &prefetch_size, &prefetch_count, &global) == FAILURE) {
803 return;
804 }
805
806 channel_resource = PHP_AMQP_GET_CHANNEL_RESOURCE(getThis());
807 PHP_AMQP_VERIFY_CHANNEL_CONNECTION_RESOURCE(channel_resource, "Could not set qos parameters.");
808
809 /* Set the prefetch size and prefetch count */
810 if (global) {
811 zend_update_property_long(this_ce, getThis(), ZEND_STRL("global_prefetch_size"), prefetch_size TSRMLS_CC);
812 zend_update_property_long(this_ce, getThis(), ZEND_STRL("global_prefetch_count"), prefetch_count TSRMLS_CC);
813 } else {
814 zend_update_property_long(this_ce, getThis(), ZEND_STRL("prefetch_size"), prefetch_size TSRMLS_CC);
815 zend_update_property_long(this_ce, getThis(), ZEND_STRL("prefetch_count"), prefetch_count TSRMLS_CC);
816 }
817
818 /* If we are already connected, set the new prefetch count */
819 if (channel_resource->is_connected) {
820 amqp_basic_qos(
821 channel_resource->connection_resource->connection_state,
822 channel_resource->channel_id,
823 (uint16_t)PHP_AMQP_READ_THIS_PROP_LONG("prefetch_size"),
824 (uint16_t)PHP_AMQP_READ_THIS_PROP_LONG("prefetch_count"),
825 /* NOTE that RabbitMQ has reinterpreted global flag field. See https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.global for details */
826 0 /* Global flag - whether this change should affect every channel_resource */
827 );
828
829 amqp_rpc_reply_t res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
830
831 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
832 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
833 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
834 return;
835 }
836
837 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
838
839 uint16_t global_prefetch_size = (uint16_t)PHP_AMQP_READ_THIS_PROP_LONG("global_prefetch_size");
840 uint16_t global_prefetch_count = (uint16_t)PHP_AMQP_READ_THIS_PROP_LONG("global_prefetch_count");
841
842 /* Re-apply current global prefetch settings if set (writing consumer prefetch settings will clear global prefetch settings) */
843 if (global_prefetch_size != 0 || global_prefetch_count != 0) {
844 amqp_basic_qos(
845 channel_resource->connection_resource->connection_state,
846 channel_resource->channel_id,
847 global_prefetch_size,
848 global_prefetch_count,
849 1
850 );
851
852 res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
853
854 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
855 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
856 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
857 return;
858 }
859
860 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
861 }
862 }
863
864 RETURN_TRUE;
865 }
866 /* }}} */
867
868
869 /* {{{ proto amqp::startTransaction()
870 start a transaction on the given channel */
PHP_METHOD(amqp_channel_class,startTransaction)871 static PHP_METHOD(amqp_channel_class, startTransaction)
872 {
873 amqp_channel_resource *channel_resource;
874
875 amqp_rpc_reply_t res;
876
877 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "") == FAILURE) {
878 return;
879 }
880
881 channel_resource = PHP_AMQP_GET_CHANNEL_RESOURCE(getThis());
882 PHP_AMQP_VERIFY_CHANNEL_RESOURCE(channel_resource, "Could not start the transaction.");
883
884 amqp_tx_select(
885 channel_resource->connection_resource->connection_state,
886 channel_resource->channel_id
887 );
888
889 res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
890
891 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
892 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
893 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
894 return;
895 }
896
897 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
898
899 RETURN_TRUE;
900 }
901 /* }}} */
902
903
904 /* {{{ proto amqp::startTransaction()
905 start a transaction on the given channel */
PHP_METHOD(amqp_channel_class,commitTransaction)906 static PHP_METHOD(amqp_channel_class, commitTransaction)
907 {
908 amqp_channel_resource *channel_resource;
909
910 amqp_rpc_reply_t res;
911
912 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "") == FAILURE) {
913 return;
914 }
915
916 channel_resource = PHP_AMQP_GET_CHANNEL_RESOURCE(getThis());
917 PHP_AMQP_VERIFY_CHANNEL_RESOURCE(channel_resource, "Could not start the transaction.");
918
919 amqp_tx_commit(
920 channel_resource->connection_resource->connection_state,
921 channel_resource->channel_id
922 );
923
924 res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
925
926 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
927 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
928 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
929 return;
930 }
931
932 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
933
934 RETURN_TRUE;
935 }
936 /* }}} */
937
938 /* {{{ proto amqp::startTransaction()
939 start a transaction on the given channel */
PHP_METHOD(amqp_channel_class,rollbackTransaction)940 static PHP_METHOD(amqp_channel_class, rollbackTransaction)
941 {
942 amqp_channel_resource *channel_resource;
943
944 amqp_rpc_reply_t res;
945
946 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "") == FAILURE) {
947 return;
948 }
949
950
951 channel_resource = PHP_AMQP_GET_CHANNEL_RESOURCE(getThis());
952 PHP_AMQP_VERIFY_CHANNEL_RESOURCE(channel_resource, "Could not rollback the transaction.");
953
954 amqp_tx_rollback(
955 channel_resource->connection_resource->connection_state,
956 channel_resource->channel_id
957 );
958
959 res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
960
961 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
962 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
963 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
964 return;
965 }
966
967 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
968
969 RETURN_TRUE;
970 }
971 /* }}} */
972
973 /* {{{ proto AMQPChannel::getConnection()
974 Get the AMQPConnection object in use */
PHP_METHOD(amqp_channel_class,getConnection)975 static PHP_METHOD(amqp_channel_class, getConnection)
976 {
977 PHP5to7_READ_PROP_RV_PARAM_DECL;
978 PHP_AMQP_NOPARAMS();
979 PHP_AMQP_RETURN_THIS_PROP("connection")
980 }
981 /* }}} */
982
983 /* {{{ proto bool amqp::basicRecover([bool requeue=TRUE])
984 Redeliver unacknowledged messages */
PHP_METHOD(amqp_channel_class,basicRecover)985 static PHP_METHOD(amqp_channel_class, basicRecover)
986 {
987 amqp_channel_resource *channel_resource;
988
989 amqp_rpc_reply_t res;
990
991 zend_bool requeue = 1;
992
993 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|b", &requeue) == FAILURE) {
994 return;
995 }
996
997 channel_resource = PHP_AMQP_GET_CHANNEL_RESOURCE(getThis());
998 PHP_AMQP_VERIFY_CHANNEL_RESOURCE(channel_resource, "Could not redeliver unacknowledged messages.");
999
1000 amqp_basic_recover(
1001 channel_resource->connection_resource->connection_state,
1002 channel_resource->channel_id,
1003 (amqp_boolean_t) requeue
1004 );
1005
1006 res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
1007
1008 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
1009 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
1010 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1011 return;
1012 }
1013
1014 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1015
1016 RETURN_TRUE;
1017 }
1018 /* }}} */
1019
1020 /* {{{ proto bool amqp::confirmSelect()
1021 Redeliver unacknowledged messages */
PHP_METHOD(amqp_channel_class,confirmSelect)1022 PHP_METHOD(amqp_channel_class, confirmSelect)
1023 {
1024 amqp_channel_resource *channel_resource;
1025 amqp_rpc_reply_t res;
1026
1027 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "") == FAILURE) {
1028 return;
1029 }
1030
1031 channel_resource = PHP_AMQP_GET_CHANNEL_RESOURCE(getThis());
1032 PHP_AMQP_VERIFY_CHANNEL_RESOURCE(channel_resource, "Could not enable confirms mode.");
1033
1034 amqp_confirm_select(
1035 channel_resource->connection_resource->connection_state,
1036 channel_resource->channel_id
1037 );
1038
1039 res = amqp_get_rpc_reply(channel_resource->connection_resource->connection_state);
1040
1041 if (PHP_AMQP_MAYBE_ERROR(res, channel_resource)) {
1042 php_amqp_zend_throw_exception_short(res, amqp_channel_exception_class_entry TSRMLS_CC);
1043 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1044 return;
1045 }
1046
1047 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1048
1049 RETURN_TRUE;
1050 }
1051 /* }}} */
1052
1053 /* {{{ proto bool AMQPChannel::setReturnCallback(callable return_callback)
1054 Set callback for basic.return server method handling */
PHP_METHOD(amqp_channel_class,setReturnCallback)1055 PHP_METHOD(amqp_channel_class, setReturnCallback)
1056 {
1057 zend_fcall_info fci = empty_fcall_info;
1058 zend_fcall_info_cache fcc = empty_fcall_info_cache;
1059
1060 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f!", &fci, &fcc) == FAILURE) {
1061 return;
1062 }
1063
1064 amqp_channel_object *channel = PHP_AMQP_GET_CHANNEL(getThis());
1065
1066 php_amqp_destroy_fci(&channel->callbacks.basic_return.fci);
1067
1068 if (ZEND_FCI_INITIALIZED(fci)) {
1069 php_amqp_duplicate_fci(&fci);
1070 channel->callbacks.basic_return.fci = fci;
1071 channel->callbacks.basic_return.fcc = fcc;
1072 }
1073 }
1074 /* }}} */
1075
1076 /* {{{ proto bool AMQPChannel::waitForBasicReturn([double timeout=0.0])
1077 Wait for basic.return method from server */
PHP_METHOD(amqp_channel_class,waitForBasicReturn)1078 PHP_METHOD(amqp_channel_class, waitForBasicReturn)
1079 {
1080 amqp_channel_object *channel;
1081 amqp_channel_resource *channel_resource;
1082 amqp_method_t method;
1083
1084 int status;
1085
1086 double timeout = 0;
1087
1088 struct timeval tv = {0};
1089 struct timeval *tv_ptr = &tv;
1090
1091 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|d", &timeout) == FAILURE) {
1092 return;
1093 }
1094
1095 if (timeout < 0) {
1096 zend_throw_exception(amqp_channel_exception_class_entry, "Timeout must be greater than or equal to zero.", 0 TSRMLS_CC);
1097 return;
1098 }
1099
1100 channel = PHP_AMQP_GET_CHANNEL(getThis());
1101
1102 channel_resource = channel->channel_resource;
1103 PHP_AMQP_VERIFY_CHANNEL_RESOURCE(channel_resource, "Could not start wait loop for basic.return method.");
1104
1105 if (timeout > 0) {
1106 tv.tv_sec = (long int) timeout;
1107 tv.tv_usec = (long int) ((timeout - tv.tv_sec) * 1000000);
1108 } else {
1109 tv_ptr = NULL;
1110 }
1111
1112 assert(channel_resource->channel_id > 0);
1113
1114 while(1) {
1115 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1116
1117 status = amqp_simple_wait_method_noblock(channel_resource->connection_resource->connection_state, channel_resource->channel_id, AMQP_BASIC_RETURN_METHOD, &method, tv_ptr);
1118
1119 if (AMQP_STATUS_TIMEOUT == status) {
1120 zend_throw_exception(amqp_queue_exception_class_entry, "Wait timeout exceed", 0 TSRMLS_CC);
1121
1122 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1123 return;
1124 }
1125
1126 if (status != AMQP_STATUS_OK) {
1127 /* Emulate library error */
1128 amqp_rpc_reply_t res;
1129
1130 if (AMQP_RESPONSE_SERVER_EXCEPTION == status) {
1131 res.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
1132 res.reply = method;
1133 } else {
1134 res.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
1135 res.library_error = status;
1136 }
1137
1138 php_amqp_error(res, &PHP_AMQP_G(error_message), channel_resource->connection_resource, channel_resource TSRMLS_CC);
1139
1140 php_amqp_zend_throw_exception(res, amqp_queue_exception_class_entry, PHP_AMQP_G(error_message), PHP_AMQP_G(error_code) TSRMLS_CC);
1141 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1142 return;
1143 }
1144
1145 status = php_amqp_handle_basic_return(&PHP_AMQP_G(error_message), channel_resource->connection_resource, channel_resource->channel_id, channel, &method TSRMLS_CC);
1146
1147 if (PHP_AMQP_RESOURCE_RESPONSE_BREAK == status) {
1148 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1149 break;
1150 }
1151
1152 if (PHP_AMQP_RESOURCE_RESPONSE_OK != status) {
1153 /* Emulate library error */
1154 amqp_rpc_reply_t res;
1155 res.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
1156 res.library_error = status;
1157
1158 php_amqp_error(res, &PHP_AMQP_G(error_message), channel_resource->connection_resource, channel_resource TSRMLS_CC);
1159
1160 php_amqp_zend_throw_exception(res, amqp_channel_exception_class_entry, PHP_AMQP_G(error_message), PHP_AMQP_G(error_code) TSRMLS_CC);
1161 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1162 return;
1163 }
1164 }
1165 }
1166 /* }}} */
1167
1168 /* {{{ proto bool AMQPChannel::setConfirmCallback(callable ack_callback [, callable nack_callback = null])
1169 Set callback for basic.ack and, optionally, basic.nac server methods handling */
PHP_METHOD(amqp_channel_class,setConfirmCallback)1170 PHP_METHOD(amqp_channel_class, setConfirmCallback)
1171 {
1172 zend_fcall_info ack_fci = empty_fcall_info;
1173 zend_fcall_info_cache ack_fcc = empty_fcall_info_cache;
1174
1175 zend_fcall_info nack_fci = empty_fcall_info;
1176 zend_fcall_info_cache nack_fcc = empty_fcall_info_cache;
1177
1178 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f!|f!", &ack_fci, &ack_fcc, &nack_fci, &nack_fcc) == FAILURE) {
1179 return;
1180 }
1181
1182 amqp_channel_object *channel = PHP_AMQP_GET_CHANNEL(getThis());
1183
1184 php_amqp_destroy_fci(&channel->callbacks.basic_ack.fci);
1185
1186 if (ZEND_FCI_INITIALIZED(ack_fci)) {
1187 php_amqp_duplicate_fci(&ack_fci);
1188 channel->callbacks.basic_ack.fci = ack_fci;
1189 channel->callbacks.basic_ack.fcc = ack_fcc;
1190 }
1191
1192 php_amqp_destroy_fci(&channel->callbacks.basic_nack.fci);
1193
1194 if (ZEND_FCI_INITIALIZED(nack_fci)) {
1195 php_amqp_duplicate_fci(&nack_fci);
1196 channel->callbacks.basic_nack.fci = nack_fci;
1197 channel->callbacks.basic_nack.fcc = nack_fcc;
1198 }
1199 }
1200 /* }}} */
1201
1202
1203 /* {{{ proto bool amqp::waitForConfirm([double timeout=0.0])
1204 Redeliver unacknowledged messages */
PHP_METHOD(amqp_channel_class,waitForConfirm)1205 PHP_METHOD(amqp_channel_class, waitForConfirm)
1206 {
1207 amqp_channel_object *channel;
1208 amqp_channel_resource *channel_resource;
1209 amqp_method_t method;
1210
1211 int status;
1212
1213 double timeout = 0;
1214
1215 struct timeval tv = {0};
1216 struct timeval *tv_ptr = &tv;
1217
1218 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|d", &timeout) == FAILURE) {
1219 return;
1220 }
1221
1222 if (timeout < 0) {
1223 zend_throw_exception(amqp_channel_exception_class_entry, "Timeout must be greater than or equal to zero.", 0 TSRMLS_CC);
1224 return;
1225 }
1226
1227 channel = PHP_AMQP_GET_CHANNEL(getThis());
1228
1229 channel_resource = channel->channel_resource;
1230 PHP_AMQP_VERIFY_CHANNEL_RESOURCE(channel_resource, "Could not start wait loop for basic.return method.");
1231
1232 if (timeout > 0) {
1233 tv.tv_sec = (long int) timeout;
1234 tv.tv_usec = (long int) ((timeout - tv.tv_sec) * 1000000);
1235 } else {
1236 tv_ptr = NULL;
1237 }
1238
1239 assert(channel_resource->channel_id > 0);
1240
1241
1242 amqp_method_number_t expected_methods[] = { AMQP_BASIC_ACK_METHOD, AMQP_BASIC_NACK_METHOD, AMQP_BASIC_RETURN_METHOD, 0 };
1243
1244 while(1) {
1245 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1246
1247 status = amqp_simple_wait_method_list_noblock(channel_resource->connection_resource->connection_state, channel_resource->channel_id, expected_methods, &method, tv_ptr);
1248
1249 if (AMQP_STATUS_TIMEOUT == status) {
1250 zend_throw_exception(amqp_queue_exception_class_entry, "Wait timeout exceed", 0 TSRMLS_CC);
1251
1252 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1253 return;
1254 }
1255
1256 if (status != AMQP_STATUS_OK) {
1257 /* Emulate library error */
1258 amqp_rpc_reply_t res;
1259
1260 if (AMQP_RESPONSE_SERVER_EXCEPTION == status) {
1261 res.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
1262 res.reply = method;
1263 } else {
1264 res.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
1265 res.library_error = status;
1266 }
1267
1268 php_amqp_error(res, &PHP_AMQP_G(error_message), channel_resource->connection_resource, channel_resource TSRMLS_CC);
1269
1270 php_amqp_zend_throw_exception(res, amqp_channel_exception_class_entry, PHP_AMQP_G(error_message), PHP_AMQP_G(error_code) TSRMLS_CC);
1271 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1272 return;
1273 }
1274
1275 switch(method.id) {
1276 case AMQP_BASIC_ACK_METHOD:
1277 status = php_amqp_handle_basic_ack(&PHP_AMQP_G(error_message), channel_resource->connection_resource, channel_resource->channel_id, channel, &method TSRMLS_CC);
1278 break;
1279 case AMQP_BASIC_NACK_METHOD:
1280 status = php_amqp_handle_basic_nack(&PHP_AMQP_G(error_message), channel_resource->connection_resource, channel_resource->channel_id, channel, &method TSRMLS_CC);
1281 break;
1282 case AMQP_BASIC_RETURN_METHOD:
1283 status = php_amqp_handle_basic_return(&PHP_AMQP_G(error_message), channel_resource->connection_resource, channel_resource->channel_id, channel, &method TSRMLS_CC);
1284 break;
1285 default:
1286 status = AMQP_STATUS_WRONG_METHOD;
1287 }
1288
1289 if (PHP_AMQP_RESOURCE_RESPONSE_BREAK == status) {
1290 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1291 break;
1292 }
1293
1294 if (PHP_AMQP_RESOURCE_RESPONSE_OK != status) {
1295 /* Emulate library error */
1296 amqp_rpc_reply_t res;
1297 res.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
1298 res.library_error = status;
1299
1300 php_amqp_error(res, &PHP_AMQP_G(error_message), channel_resource->connection_resource, channel_resource TSRMLS_CC);
1301
1302 php_amqp_zend_throw_exception(res, amqp_queue_exception_class_entry, PHP_AMQP_G(error_message), PHP_AMQP_G(error_code) TSRMLS_CC);
1303 php_amqp_maybe_release_buffers_on_channel(channel_resource->connection_resource, channel_resource);
1304 return;
1305 }
1306 }
1307 }
1308 /* }}} */
1309
1310 /* {{{ proto amqp::getConsumers() */
PHP_METHOD(amqp_channel_class,getConsumers)1311 static PHP_METHOD(amqp_channel_class, getConsumers)
1312 {
1313 PHP5to7_READ_PROP_RV_PARAM_DECL;
1314 PHP_AMQP_NOPARAMS();
1315 PHP_AMQP_RETURN_THIS_PROP("consumers");
1316 }
1317 /* }}} */
1318
1319
1320 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class__construct, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
1321 ZEND_ARG_OBJ_INFO(0, amqp_connection, AMQPConnection, 0)
1322 ZEND_END_ARG_INFO()
1323
1324 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_isConnected, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1325 ZEND_END_ARG_INFO()
1326
1327 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_close, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1328 ZEND_END_ARG_INFO()
1329
1330 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_getChannelId, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1331 ZEND_END_ARG_INFO()
1332
1333 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_setPrefetchSize, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
1334 ZEND_ARG_INFO(0, size)
1335 ZEND_END_ARG_INFO()
1336
1337 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_getPrefetchSize, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1338 ZEND_END_ARG_INFO()
1339
1340 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_setPrefetchCount, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
1341 ZEND_ARG_INFO(0, count)
1342 ZEND_END_ARG_INFO()
1343
1344 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_getPrefetchCount, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1345 ZEND_END_ARG_INFO()
1346
1347 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_setGlobalPrefetchSize, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
1348 ZEND_ARG_INFO(0, size)
1349 ZEND_END_ARG_INFO()
1350
1351 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_getGlobalPrefetchSize, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1352 ZEND_END_ARG_INFO()
1353
1354 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_setGlobalPrefetchCount, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
1355 ZEND_ARG_INFO(0, count)
1356 ZEND_END_ARG_INFO()
1357
1358 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_getGlobalPrefetchCount, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1359 ZEND_END_ARG_INFO()
1360
1361 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_qos, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 2)
1362 ZEND_ARG_INFO(0, size)
1363 ZEND_ARG_INFO(0, count)
1364 ZEND_ARG_INFO(0, global)
1365 ZEND_END_ARG_INFO()
1366
1367 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_startTransaction, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1368 ZEND_END_ARG_INFO()
1369
1370 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_commitTransaction, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1371 ZEND_END_ARG_INFO()
1372
1373 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_rollbackTransaction, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1374 ZEND_END_ARG_INFO()
1375
1376 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_getConnection, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1377 ZEND_END_ARG_INFO()
1378
1379 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_basicRecover, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1380 ZEND_ARG_INFO(0, requeue)
1381 ZEND_END_ARG_INFO()
1382
1383 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_confirmSelect, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1384 ZEND_END_ARG_INFO()
1385
1386 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_setConfirmCallback, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
1387 ZEND_ARG_INFO(0, ack_callback)
1388 ZEND_ARG_INFO(0, nack_callback)
1389 ZEND_END_ARG_INFO()
1390
1391 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_waitForConfirm, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1392 ZEND_ARG_INFO(0, timeout)
1393 ZEND_END_ARG_INFO()
1394
1395 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_setReturnCallback, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
1396 ZEND_ARG_INFO(0, return_callback)
1397 ZEND_END_ARG_INFO()
1398
1399 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_waitForBasicReturn, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1400 ZEND_ARG_INFO(0, timeout)
1401 ZEND_END_ARG_INFO()
1402
1403 ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_getConsumers, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
1404 ZEND_END_ARG_INFO()
1405
1406 //setConfirmsCallback
1407
1408
1409 zend_function_entry amqp_channel_class_functions[] = {
1410 PHP_ME(amqp_channel_class, __construct, arginfo_amqp_channel_class__construct, ZEND_ACC_PUBLIC)
1411 PHP_ME(amqp_channel_class, isConnected, arginfo_amqp_channel_class_isConnected, ZEND_ACC_PUBLIC)
1412 PHP_ME(amqp_channel_class, close, arginfo_amqp_channel_class_close, ZEND_ACC_PUBLIC)
1413
1414 PHP_ME(amqp_channel_class, getChannelId, arginfo_amqp_channel_class_getChannelId, ZEND_ACC_PUBLIC)
1415
1416 PHP_ME(amqp_channel_class, setPrefetchSize, arginfo_amqp_channel_class_setPrefetchSize, ZEND_ACC_PUBLIC)
1417 PHP_ME(amqp_channel_class, getPrefetchSize, arginfo_amqp_channel_class_getPrefetchSize, ZEND_ACC_PUBLIC)
1418 PHP_ME(amqp_channel_class, setPrefetchCount, arginfo_amqp_channel_class_setPrefetchCount, ZEND_ACC_PUBLIC)
1419 PHP_ME(amqp_channel_class, getPrefetchCount, arginfo_amqp_channel_class_getPrefetchCount, ZEND_ACC_PUBLIC)
1420 PHP_ME(amqp_channel_class, setGlobalPrefetchSize, arginfo_amqp_channel_class_setGlobalPrefetchSize, ZEND_ACC_PUBLIC)
1421 PHP_ME(amqp_channel_class, getGlobalPrefetchSize, arginfo_amqp_channel_class_getGlobalPrefetchSize, ZEND_ACC_PUBLIC)
1422 PHP_ME(amqp_channel_class, setGlobalPrefetchCount, arginfo_amqp_channel_class_setGlobalPrefetchCount, ZEND_ACC_PUBLIC)
1423 PHP_ME(amqp_channel_class, getGlobalPrefetchCount, arginfo_amqp_channel_class_getGlobalPrefetchCount, ZEND_ACC_PUBLIC)
1424 PHP_ME(amqp_channel_class, qos, arginfo_amqp_channel_class_qos, ZEND_ACC_PUBLIC)
1425
1426 PHP_ME(amqp_channel_class, startTransaction, arginfo_amqp_channel_class_startTransaction, ZEND_ACC_PUBLIC)
1427 PHP_ME(amqp_channel_class, commitTransaction, arginfo_amqp_channel_class_commitTransaction, ZEND_ACC_PUBLIC)
1428 PHP_ME(amqp_channel_class, rollbackTransaction, arginfo_amqp_channel_class_rollbackTransaction, ZEND_ACC_PUBLIC)
1429
1430 PHP_ME(amqp_channel_class, getConnection, arginfo_amqp_channel_class_getConnection, ZEND_ACC_PUBLIC)
1431
1432 PHP_ME(amqp_channel_class, basicRecover, arginfo_amqp_channel_class_basicRecover, ZEND_ACC_PUBLIC)
1433
1434 PHP_ME(amqp_channel_class, confirmSelect, arginfo_amqp_channel_class_confirmSelect, ZEND_ACC_PUBLIC)
1435 PHP_ME(amqp_channel_class, waitForConfirm, arginfo_amqp_channel_class_waitForConfirm, ZEND_ACC_PUBLIC)
1436 PHP_ME(amqp_channel_class, setConfirmCallback, arginfo_amqp_channel_class_setConfirmCallback, ZEND_ACC_PUBLIC)
1437
1438 PHP_ME(amqp_channel_class, setReturnCallback, arginfo_amqp_channel_class_setReturnCallback, ZEND_ACC_PUBLIC)
1439 PHP_ME(amqp_channel_class, waitForBasicReturn, arginfo_amqp_channel_class_waitForBasicReturn, ZEND_ACC_PUBLIC)
1440
1441 PHP_ME(amqp_channel_class, getConsumers, arginfo_amqp_channel_class_getConsumers, ZEND_ACC_PUBLIC)
1442
1443 {NULL, NULL, NULL}
1444 };
1445
PHP_MINIT_FUNCTION(amqp_channel)1446 PHP_MINIT_FUNCTION(amqp_channel)
1447 {
1448 zend_class_entry ce;
1449
1450 INIT_CLASS_ENTRY(ce, "AMQPChannel", amqp_channel_class_functions);
1451 ce.create_object = amqp_channel_ctor;
1452 amqp_channel_class_entry = zend_register_internal_class(&ce TSRMLS_CC);
1453
1454 zend_declare_property_null(this_ce, ZEND_STRL("connection"), ZEND_ACC_PRIVATE TSRMLS_CC);
1455
1456 zend_declare_property_null(this_ce, ZEND_STRL("prefetch_count"), ZEND_ACC_PRIVATE TSRMLS_CC);
1457 zend_declare_property_long(this_ce, ZEND_STRL("prefetch_size"), 0, ZEND_ACC_PRIVATE TSRMLS_CC);
1458 zend_declare_property_null(this_ce, ZEND_STRL("global_prefetch_count"), ZEND_ACC_PRIVATE TSRMLS_CC);
1459 zend_declare_property_null(this_ce, ZEND_STRL("global_prefetch_size"), ZEND_ACC_PRIVATE TSRMLS_CC);
1460
1461 zend_declare_property_null(this_ce, ZEND_STRL("consumers"), ZEND_ACC_PRIVATE TSRMLS_CC);
1462
1463 #if PHP_MAJOR_VERSION >=7
1464 memcpy(&amqp_channel_object_handlers, zend_get_std_object_handlers(), sizeof(zend_object_handlers));
1465
1466 amqp_channel_object_handlers.offset = XtOffsetOf(amqp_channel_object, zo);
1467 amqp_channel_object_handlers.free_obj = amqp_channel_free;
1468 #endif
1469
1470 #if ZEND_MODULE_API_NO >= 20100000
1471 amqp_channel_object_handlers.get_gc = amqp_channel_gc;
1472 #endif
1473
1474 return SUCCESS;
1475 }
1476 /*
1477 *Local variables:
1478 *tab-width: 4
1479 *c-basic-offset: 4
1480 *End:
1481 *vim600: noet sw=4 ts=4 fdm=marker
1482 *vim<600: noet sw=4 ts=4
1483 */
1484