1 /*
2 +----------------------------------------------------------------------+
3 | php-rdkafka |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2016 Arnaud Le Blanc |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
15 | Author: Arnaud Le Blanc <arnaud.lb@gmail.com> |
16 +----------------------------------------------------------------------+
17 */
18
19 #ifdef HAVE_CONFIG_H
20 #include "config.h"
21 #endif
22
23 #include "php.h"
24 #include "php_rdkafka.h"
25 #include "php_rdkafka_priv.h"
26 #include "librdkafka/rdkafka.h"
27 #include "ext/spl/spl_iterators.h"
28 #include "Zend/zend_interfaces.h"
29 #include "Zend/zend_exceptions.h"
30 #include "ext/spl/spl_exceptions.h"
31 #include "topic.h"
32 #include "queue.h"
33 #include "message.h"
34
35 static zend_object_handlers object_handlers;
36 zend_class_entry * ce_kafka_consumer_topic;
37 zend_class_entry * ce_kafka_kafka_consumer_topic;
38 zend_class_entry * ce_kafka_producer_topic;
39 zend_class_entry * ce_kafka_topic;
40
41 typedef struct _php_callback {
42 zend_fcall_info fci;
43 zend_fcall_info_cache fcc;
44 } php_callback;
45
kafka_topic_free(zend_object * object TSRMLS_DC)46 static void kafka_topic_free(zend_object *object TSRMLS_DC) /* {{{ */
47 {
48 kafka_topic_object *intern = get_custom_object(kafka_topic_object, object);
49
50 if (ZE_ISDEF(intern->zrk) && intern->rkt) {
51 kafka_object *kafka_intern = get_kafka_object(P_ZEVAL(intern->zrk) TSRMLS_CC);
52 if (kafka_intern) {
53 zend_hash_index_del(&kafka_intern->topics, (zend_ulong)intern);
54 }
55 }
56
57 zend_object_std_dtor(&intern->std TSRMLS_CC);
58
59 free_custom_object(intern);
60 }
61 /* }}} */
62
kafka_topic_new(zend_class_entry * class_type TSRMLS_DC)63 static zend_object_value kafka_topic_new(zend_class_entry *class_type TSRMLS_DC) /* {{{ */
64 {
65 zend_object_value retval;
66 kafka_topic_object *intern;
67
68 intern = alloc_object(intern, class_type);
69 zend_object_std_init(&intern->std, class_type TSRMLS_CC);
70 object_properties_init(&intern->std, class_type);
71
72 STORE_OBJECT(retval, intern, (zend_objects_store_dtor_t) zend_objects_destroy_object, kafka_topic_free, NULL);
73 SET_OBJECT_HANDLERS(retval, &object_handlers);
74
75 return retval;
76 }
77 /* }}} */
78
79
consume_callback(rd_kafka_message_t * msg,void * opaque)80 static void consume_callback(rd_kafka_message_t *msg, void *opaque)
81 {
82 php_callback *cb = (php_callback*) opaque;
83 zeval args[1];
84 TSRMLS_FETCH();
85
86 if (!opaque) {
87 return;
88 }
89
90 if (!cb) {
91 return;
92 }
93
94 MAKE_STD_ZEVAL(args[0]);
95
96 kafka_message_new(P_ZEVAL(args[0]), msg TSRMLS_CC);
97
98 rdkafka_call_function(&cb->fci, &cb->fcc, NULL, 1, args TSRMLS_CC);
99
100 zval_ptr_dtor(&args[0]);
101 }
102
get_kafka_topic_object(zval * zrkt TSRMLS_DC)103 kafka_topic_object * get_kafka_topic_object(zval *zrkt TSRMLS_DC)
104 {
105 kafka_topic_object *orkt = get_custom_object_zval(kafka_topic_object, zrkt);
106
107 if (!orkt->rkt) {
108 zend_throw_exception_ex(NULL, 0 TSRMLS_CC, "RdKafka\\Topic::__construct() has not been called" TSRMLS_CC);
109 return NULL;
110 }
111
112 return orkt;
113 }
114
115 /* {{{ proto RdKafka\ConsumerTopic::consumeCallback([int $partition, int timeout_ms, mixed $callback]) */
116
117 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_consume_callback, 0, 0, 3)
118 ZEND_ARG_INFO(0, partition)
119 ZEND_ARG_INFO(0, timeout_ms)
120 ZEND_ARG_INFO(0, callback)
ZEND_END_ARG_INFO()121 ZEND_END_ARG_INFO()
122
123 PHP_METHOD(RdKafka__ConsumerTopic, consumeCallback)
124 {
125 php_callback cb;
126 zend_long partition;
127 zend_long timeout_ms;
128 long result;
129 kafka_topic_object *intern;
130
131 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "llf", &partition, &timeout_ms, &cb.fci, &cb.fcc) == FAILURE) {
132 return;
133 }
134
135 if (partition < 0 || partition > 0x7FFFFFFF) {
136 zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
137 return;
138 }
139
140 intern = get_kafka_topic_object(getThis() TSRMLS_CC);
141 if (!intern) {
142 return;
143 }
144
145 Z_ADDREF_P(P_ZEVAL(cb.fci.function_name));
146
147 result = rd_kafka_consume_callback(intern->rkt, partition, timeout_ms, consume_callback, &cb);
148
149 zval_ptr_dtor(&cb.fci.function_name);
150
151 RETURN_LONG(result);
152 }
153 /* }}} */
154
155 /* {{{ proto void RdKafka\ConsumerTopic::consumeQueueStart(int $partition, int $offset, RdKafka\Queue $queue)
156 * Same as consumeStart(), but re-routes incoming messages to the provided queue */
157
158 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_consume_queue_start, 0, 0, 3)
159 ZEND_ARG_INFO(0, partition)
160 ZEND_ARG_INFO(0, offset)
161 ZEND_ARG_INFO(0, queue)
ZEND_END_ARG_INFO()162 ZEND_END_ARG_INFO()
163
164 PHP_METHOD(RdKafka__ConsumerTopic, consumeQueueStart)
165 {
166 zval *zrkqu;
167 kafka_topic_object *intern;
168 kafka_queue_object *queue_intern;
169 zend_long partition;
170 zend_long offset;
171 int ret;
172 rd_kafka_resp_err_t err;
173 kafka_object *kafka_intern;
174
175 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "llO", &partition, &offset, &zrkqu, ce_kafka_queue) == FAILURE) {
176 return;
177 }
178
179 if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
180 zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
181 return;
182 }
183
184 intern = get_kafka_topic_object(getThis() TSRMLS_CC);
185 if (!intern) {
186 return;
187 }
188
189 queue_intern = get_kafka_queue_object(zrkqu TSRMLS_CC);
190 if (!queue_intern) {
191 return;
192 }
193
194 kafka_intern = get_kafka_object(P_ZEVAL(intern->zrk) TSRMLS_CC);
195 if (!kafka_intern) {
196 return;
197 }
198
199 if (is_consuming_toppar(kafka_intern, intern->rkt, partition)) {
200 zend_throw_exception_ex(
201 ce_kafka_exception,
202 0 TSRMLS_CC,
203 "%s:" ZEND_LONG_FMT " is already being consumed by the same Consumer instance",
204 rd_kafka_topic_name(intern->rkt),
205 partition
206 );
207 return;
208 }
209
210 ret = rd_kafka_consume_start_queue(intern->rkt, partition, offset, queue_intern->rkqu);
211
212 if (ret == -1) {
213 err = rd_kafka_last_error();
214 zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
215 return;
216 }
217
218 add_consuming_toppar(kafka_intern, intern->rkt, partition);
219 }
220 /* }}} */
221
222 /* {{{ proto void RdKafka\ConsumerTopic::consumeStart(int partition, int offset)
223 Start consuming messages */
224
225 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_consume_start, 0, 0, 2)
226 ZEND_ARG_INFO(0, partition)
227 ZEND_ARG_INFO(0, offset)
ZEND_END_ARG_INFO()228 ZEND_END_ARG_INFO()
229
230 PHP_METHOD(RdKafka__ConsumerTopic, consumeStart)
231 {
232 kafka_topic_object *intern;
233 zend_long partition;
234 zend_long offset;
235 int ret;
236 rd_kafka_resp_err_t err;
237 kafka_object *kafka_intern;
238
239 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll", &partition, &offset) == FAILURE) {
240 return;
241 }
242
243 if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
244 zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
245 return;
246 }
247
248 intern = get_kafka_topic_object(getThis() TSRMLS_CC);
249 if (!intern) {
250 return;
251 }
252
253 kafka_intern = get_kafka_object(P_ZEVAL(intern->zrk) TSRMLS_CC);
254 if (!kafka_intern) {
255 return;
256 }
257
258 if (is_consuming_toppar(kafka_intern, intern->rkt, partition)) {
259 zend_throw_exception_ex(
260 ce_kafka_exception,
261 0 TSRMLS_CC,
262 "%s:" ZEND_LONG_FMT " is already being consumed by the same Consumer instance",
263 rd_kafka_topic_name(intern->rkt),
264 partition
265 );
266 return;
267 }
268
269 ret = rd_kafka_consume_start(intern->rkt, partition, offset);
270
271 if (ret == -1) {
272 err = rd_kafka_last_error();
273 zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
274 return;
275 }
276
277 add_consuming_toppar(kafka_intern, intern->rkt, partition);
278 }
279 /* }}} */
280
281 /* {{{ proto void RdKafka\ConsumerTopic::consumeStop(int partition)
282 Stop consuming messages */
283
284 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_consume_stop, 0, 0, 1)
285 ZEND_ARG_INFO(0, partition)
ZEND_END_ARG_INFO()286 ZEND_END_ARG_INFO()
287
288 PHP_METHOD(RdKafka__ConsumerTopic, consumeStop)
289 {
290 kafka_topic_object *intern;
291 zend_long partition;
292 int ret;
293 rd_kafka_resp_err_t err;
294 kafka_object *kafka_intern;
295
296 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &partition) == FAILURE) {
297 return;
298 }
299
300 if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
301 zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
302 return;
303 }
304
305 intern = get_kafka_topic_object(getThis() TSRMLS_CC);
306 if (!intern) {
307 return;
308 }
309
310 kafka_intern = get_kafka_object(P_ZEVAL(intern->zrk) TSRMLS_CC);
311 if (!kafka_intern) {
312 return;
313 }
314
315 ret = rd_kafka_consume_stop(intern->rkt, partition);
316
317 if (ret == -1) {
318 err = rd_kafka_last_error();
319 zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
320 return;
321 }
322
323 del_consuming_toppar(kafka_intern, intern->rkt, partition);
324 }
325 /* }}} */
326
327 /* {{{ proto RdKafka\Message RdKafka\ConsumerTopic::consume(int $partition, int timeout_ms)
328 Consume a single message from partition */
329
330 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_consume, 0, 0, 2)
331 ZEND_ARG_INFO(0, partition)
332 ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()333 ZEND_END_ARG_INFO()
334
335 PHP_METHOD(RdKafka__ConsumerTopic, consume)
336 {
337 kafka_topic_object *intern;
338 zend_long partition;
339 zend_long timeout_ms;
340 rd_kafka_message_t *message;
341 rd_kafka_resp_err_t err;
342
343 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll", &partition, &timeout_ms) == FAILURE) {
344 return;
345 }
346
347 if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
348 zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
349 return;
350 }
351
352 intern = get_kafka_topic_object(getThis() TSRMLS_CC);
353 if (!intern) {
354 return;
355 }
356
357 message = rd_kafka_consume(intern->rkt, partition, timeout_ms);
358
359 if (!message) {
360 err = rd_kafka_last_error();
361 if (err == RD_KAFKA_RESP_ERR__TIMED_OUT) {
362 return;
363 }
364 zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
365 return;
366 }
367
368 kafka_message_new(return_value, message TSRMLS_CC);
369
370 rd_kafka_message_destroy(message);
371 }
372 /* }}} */
373
374 /* {{{ proto RdKafka\Message RdKafka\ConsumerTopic::consumeBatch(int $partition, int $timeout_ms, int $batch_size)
375 Consume a batch of messages from a partition */
376
377 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_consume_batch, 0, 0, 3)
378 ZEND_ARG_INFO(0, partition)
379 ZEND_ARG_INFO(0, timeout_ms)
380 ZEND_ARG_INFO(0, batch_size)
ZEND_END_ARG_INFO()381 ZEND_END_ARG_INFO()
382
383 PHP_METHOD(RdKafka__ConsumerTopic, consumeBatch)
384 {
385 kafka_topic_object *intern;
386 zend_long partition, timeout_ms, batch_size;
387 long result, i;
388 rd_kafka_message_t **rkmessages;
389 rd_kafka_resp_err_t err;
390
391 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lll", &partition, &timeout_ms, &batch_size) == FAILURE) {
392 return;
393 }
394
395 if (0 >= batch_size) {
396 zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for batch_size", batch_size TSRMLS_CC);
397 return;
398 }
399
400 if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
401 zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
402 return;
403 }
404
405 intern = get_kafka_topic_object(getThis() TSRMLS_CC);
406 if (!intern) {
407 return;
408 }
409
410 rkmessages = malloc(sizeof(*rkmessages) * batch_size);
411
412 result = rd_kafka_consume_batch(intern->rkt, partition, timeout_ms, rkmessages, batch_size);
413
414 if (result == -1) {
415 free(rkmessages);
416 err = rd_kafka_last_error();
417 zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
418 return;
419 }
420
421 if (result >= 0) {
422 kafka_message_list_to_array(return_value, rkmessages, result TSRMLS_CC);
423 for (i = 0; i < result; ++i) {
424 rd_kafka_message_destroy(rkmessages[i]);
425 }
426 }
427
428 free(rkmessages);
429 }
430 /* }}} */
431
432 /* {{{ proto void RdKafka\ConsumerTopic::offsetStore(int partition, int offset) */
433
434 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_offset_store, 0, 0, 2)
435 ZEND_ARG_INFO(0, partition)
436 ZEND_ARG_INFO(0, offset)
ZEND_END_ARG_INFO()437 ZEND_END_ARG_INFO()
438
439 PHP_METHOD(RdKafka__ConsumerTopic, offsetStore)
440 {
441 kafka_topic_object *intern;
442 zend_long partition;
443 zend_long offset;
444 rd_kafka_resp_err_t err;
445
446 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll", &partition, &offset) == FAILURE) {
447 return;
448 }
449
450 if (partition < 0 || partition > 0x7FFFFFFF) {
451 zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
452 return;
453 }
454
455 intern = get_kafka_topic_object(getThis() TSRMLS_CC);
456 if (!intern) {
457 return;
458 }
459
460 err = rd_kafka_offset_store(intern->rkt, partition, offset);
461
462 if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
463 zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
464 return;
465 }
466 }
467 /* }}} */
468
469 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka___private_construct, 0, 0, 0)
470 ZEND_END_ARG_INFO()
471
472 static const zend_function_entry kafka_consumer_topic_fe[] = {
473 PHP_ME(RdKafka, __construct, arginfo_kafka___private_construct, ZEND_ACC_PRIVATE)
474 PHP_ME(RdKafka__ConsumerTopic, consumeQueueStart, arginfo_kafka_consume_queue_start, ZEND_ACC_PUBLIC)
475 PHP_ME(RdKafka__ConsumerTopic, consumeCallback, arginfo_kafka_consume_callback, ZEND_ACC_PUBLIC)
476 PHP_ME(RdKafka__ConsumerTopic, consumeStart, arginfo_kafka_consume_start, ZEND_ACC_PUBLIC)
477 PHP_ME(RdKafka__ConsumerTopic, consumeStop, arginfo_kafka_consume_stop, ZEND_ACC_PUBLIC)
478 PHP_ME(RdKafka__ConsumerTopic, consume, arginfo_kafka_consume, ZEND_ACC_PUBLIC)
479 PHP_ME(RdKafka__ConsumerTopic, consumeBatch, arginfo_kafka_consume_batch, ZEND_ACC_PUBLIC)
480 PHP_ME(RdKafka__ConsumerTopic, offsetStore, arginfo_kafka_offset_store, ZEND_ACC_PUBLIC)
481 PHP_FE_END
482 };
483
484 static const zend_function_entry kafka_kafka_consumer_topic_fe[] = {
485 PHP_ME(RdKafka, __construct, arginfo_kafka___private_construct, ZEND_ACC_PRIVATE)
486 PHP_ME(RdKafka__ConsumerTopic, offsetStore, arginfo_kafka_offset_store, ZEND_ACC_PUBLIC)
487 PHP_FE_END
488 };
489
490 /* {{{ proto void RdKafka\ProducerTopic::produce(int $partition, int $msgflags[, string $payload, string $key])
491 Produce and send a single message to broker. */
492
493 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_produce, 0, 0, 2)
494 ZEND_ARG_INFO(0, partition)
495 ZEND_ARG_INFO(0, msgflags)
496 ZEND_ARG_INFO(0, payload)
497 ZEND_ARG_INFO(0, key)
ZEND_END_ARG_INFO()498 ZEND_END_ARG_INFO()
499
500 PHP_METHOD(RdKafka__ProducerTopic, produce)
501 {
502 zend_long partition;
503 zend_long msgflags;
504 char *payload = NULL;
505 arglen_t payload_len = 0;
506 char *key = NULL;
507 arglen_t key_len = 0;
508 int ret;
509 rd_kafka_resp_err_t err;
510 kafka_topic_object *intern;
511
512 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll|s!s!", &partition, &msgflags, &payload, &payload_len, &key, &key_len) == FAILURE) {
513 return;
514 }
515
516 if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
517 zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
518 return;
519 }
520
521 if (msgflags != 0 && msgflags != RD_KAFKA_MSG_F_BLOCK) {
522 zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Invalid value '%ld' for $msgflags", msgflags TSRMLS_CC);
523 return;
524 }
525
526 intern = get_kafka_topic_object(getThis() TSRMLS_CC);
527
528 ret = rd_kafka_produce(intern->rkt, partition, msgflags | RD_KAFKA_MSG_F_COPY, payload, payload_len, key, key_len, NULL);
529
530 if (ret == -1) {
531 err = rd_kafka_last_error();
532 zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
533 return;
534 }
535 }
536 /* }}} */
537
538 #ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
539 /* {{{ proto void RdKafka\ProducerTopic::producev(int $partition, int $msgflags[, string $payload, string $key, array $headers, int $timestamp_ms])
540 Produce and send a single message to broker (with headers possibility and timestamp). */
541
542 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_producev, 0, 0, 2)
543 ZEND_ARG_INFO(0, partition)
544 ZEND_ARG_INFO(0, msgflags)
545 ZEND_ARG_INFO(0, payload)
546 ZEND_ARG_INFO(0, key)
547 ZEND_ARG_INFO(0, headers)
548 ZEND_ARG_INFO(0, timestamp_ms)
ZEND_END_ARG_INFO()549 ZEND_END_ARG_INFO()
550
551 PHP_METHOD(RdKafka__ProducerTopic, producev)
552 {
553 zend_long partition;
554 zend_long msgflags;
555 char *payload = NULL;
556 arglen_t payload_len = 0;
557 char *key = NULL;
558 arglen_t key_len = 0;
559 rd_kafka_resp_err_t err;
560 kafka_topic_object *intern;
561 kafka_object *kafka_intern;
562 HashTable *headersParam = NULL;
563 HashPosition headersParamPos;
564 char *header_key;
565 zeval *header_value;
566 rd_kafka_headers_t *headers;
567 zend_long timestamp_ms = 0;
568 zend_bool timestamp_ms_is_null = 0;
569
570 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll|s!s!h!l!", &partition, &msgflags, &payload, &payload_len, &key, &key_len, &headersParam, ×tamp_ms, ×tamp_ms_is_null) == FAILURE) {
571 return;
572 }
573
574 if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
575 zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
576 return;
577 }
578
579 if (msgflags != 0 && msgflags != RD_KAFKA_MSG_F_BLOCK) {
580 zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Invalid value '%ld' for $msgflags", msgflags TSRMLS_CC);
581 return;
582 }
583
584 if (timestamp_ms_is_null == 1) {
585 timestamp_ms = 0;
586 }
587
588 intern = get_kafka_topic_object(getThis() TSRMLS_CC);
589
590 if (headersParam != NULL && zend_hash_num_elements(headersParam) > 0) {
591 headers = rd_kafka_headers_new(zend_hash_num_elements(headersParam));
592 for (zend_hash_internal_pointer_reset_ex(headersParam, &headersParamPos);
593 (header_value = rdkafka_hash_get_current_data_ex(headersParam, &headersParamPos)) != NULL &&
594 (header_key = rdkafka_hash_get_current_key_ex(headersParam, &headersParamPos)) != NULL;
595 zend_hash_move_forward_ex(headersParam, &headersParamPos)) {
596 convert_to_string_ex(header_value);
597 rd_kafka_header_add(
598 headers,
599 header_key,
600 -1, // Auto detect header title length
601 Z_STRVAL_P(ZEVAL(header_value)),
602 Z_STRLEN_P(ZEVAL(header_value))
603 );
604 }
605 } else {
606 headers = rd_kafka_headers_new(0);
607 }
608
609 kafka_intern = get_kafka_object(P_ZEVAL(intern->zrk) TSRMLS_CC);
610 if (!kafka_intern) {
611 return;
612 }
613
614 err = rd_kafka_producev(
615 kafka_intern->rk,
616 RD_KAFKA_V_RKT(intern->rkt),
617 RD_KAFKA_V_PARTITION(partition),
618 RD_KAFKA_V_MSGFLAGS(msgflags | RD_KAFKA_MSG_F_COPY),
619 RD_KAFKA_V_VALUE(payload, payload_len),
620 RD_KAFKA_V_KEY(key, key_len),
621 RD_KAFKA_V_TIMESTAMP(timestamp_ms),
622 RD_KAFKA_V_HEADERS(headers),
623 RD_KAFKA_V_END
624 );
625
626 if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
627 rd_kafka_headers_destroy(headers);
628 zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
629 return;
630 }
631 }
632 /* }}} */
633 #endif
634
635 static const zend_function_entry kafka_producer_topic_fe[] = {
636 PHP_ME(RdKafka, __construct, arginfo_kafka___private_construct, ZEND_ACC_PRIVATE)
637 PHP_ME(RdKafka__ProducerTopic, produce, arginfo_kafka_produce, ZEND_ACC_PUBLIC)
638 #ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
639 PHP_ME(RdKafka__ProducerTopic, producev, arginfo_kafka_producev, ZEND_ACC_PUBLIC)
640 #endif
641 PHP_FE_END
642 };
643
644 /* {{{ proto string RdKafka\Topic::getName() */
645
646 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_topic_get_name, 0, 0, 0)
ZEND_END_ARG_INFO()647 ZEND_END_ARG_INFO()
648
649 PHP_METHOD(RdKafka__Topic, getName)
650 {
651 kafka_topic_object *intern;
652
653 if (zend_parse_parameters_none() == FAILURE) {
654 return;
655 }
656
657 intern = get_kafka_topic_object(getThis() TSRMLS_CC);
658 if (!intern) {
659 return;
660 }
661
662 RDKAFKA_RETURN_STRING(rd_kafka_topic_name(intern->rkt));
663 }
664 /* }}} */
665
666 static const zend_function_entry kafka_topic_fe[] = {
667 PHP_ME(RdKafka__Topic, getName, arginfo_kafka_topic_get_name, ZEND_ACC_PUBLIC)
668 PHP_FE_END
669 };
670
kafka_topic_minit(TSRMLS_D)671 void kafka_topic_minit(TSRMLS_D) { /* {{{ */
672
673 zend_class_entry ce;
674
675 memcpy(&object_handlers, zend_get_std_object_handlers(), sizeof(zend_object_handlers));
676 object_handlers.clone_obj = NULL;
677 set_object_handler_free_obj(&object_handlers, kafka_topic_free);
678 set_object_handler_offset(&object_handlers, XtOffsetOf(kafka_topic_object, std));
679
680 INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Topic", kafka_topic_fe);
681 ce_kafka_topic = zend_register_internal_class(&ce TSRMLS_CC);
682 ce_kafka_topic->ce_flags |= ZEND_ACC_EXPLICIT_ABSTRACT_CLASS;
683 ce_kafka_topic->create_object = kafka_topic_new;
684
685 INIT_NS_CLASS_ENTRY(ce, "RdKafka", "ConsumerTopic", kafka_consumer_topic_fe);
686 ce_kafka_consumer_topic = rdkafka_register_internal_class_ex(&ce, ce_kafka_topic TSRMLS_CC);
687
688 INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumerTopic", kafka_kafka_consumer_topic_fe);
689 ce_kafka_kafka_consumer_topic = rdkafka_register_internal_class_ex(&ce, ce_kafka_topic TSRMLS_CC);
690
691 INIT_NS_CLASS_ENTRY(ce, "RdKafka", "ProducerTopic", kafka_producer_topic_fe);
692 ce_kafka_producer_topic = rdkafka_register_internal_class_ex(&ce, ce_kafka_topic TSRMLS_CC);
693 } /* }}} */
694