1 /*
2   +----------------------------------------------------------------------+
3   | php-rdkafka                                                          |
4   +----------------------------------------------------------------------+
5   | Copyright (c) 2016 Arnaud Le Blanc                                   |
6   +----------------------------------------------------------------------+
7   | This source file is subject to version 3.01 of the PHP license,      |
8   | that is bundled with this package in the file LICENSE, and is        |
9   | available through the world-wide-web at the following url:           |
10   | http://www.php.net/license/3_01.txt                                  |
11   | If you did not receive a copy of the PHP license and are unable to   |
12   | obtain it through the world-wide-web, please send a note to          |
13   | license@php.net so we can mail you a copy immediately.               |
14   +----------------------------------------------------------------------+
15   | Author: Arnaud Le Blanc <arnaud.lb@gmail.com>                        |
16   +----------------------------------------------------------------------+
17 */
18 
19 #ifdef HAVE_CONFIG_H
20 #include "config.h"
21 #endif
22 
23 #include "php.h"
24 #include "php_rdkafka.h"
25 #include "php_rdkafka_priv.h"
26 #include "librdkafka/rdkafka.h"
27 #include "ext/spl/spl_iterators.h"
28 #include "Zend/zend_interfaces.h"
29 #include "Zend/zend_exceptions.h"
30 #include "ext/spl/spl_exceptions.h"
31 #include "topic.h"
32 #include "queue.h"
33 #include "message.h"
34 
35 static zend_object_handlers object_handlers;
36 zend_class_entry * ce_kafka_consumer_topic;
37 zend_class_entry * ce_kafka_kafka_consumer_topic;
38 zend_class_entry * ce_kafka_producer_topic;
39 zend_class_entry * ce_kafka_topic;
40 
41 typedef struct _php_callback {
42     zend_fcall_info fci;
43     zend_fcall_info_cache fcc;
44 } php_callback;
45 
kafka_topic_free(zend_object * object TSRMLS_DC)46 static void kafka_topic_free(zend_object *object TSRMLS_DC) /* {{{ */
47 {
48     kafka_topic_object *intern = get_custom_object(kafka_topic_object, object);
49 
50     if (ZE_ISDEF(intern->zrk) && intern->rkt) {
51         kafka_object *kafka_intern = get_kafka_object(P_ZEVAL(intern->zrk) TSRMLS_CC);
52         if (kafka_intern) {
53             zend_hash_index_del(&kafka_intern->topics, (zend_ulong)intern);
54         }
55     }
56 
57     zend_object_std_dtor(&intern->std TSRMLS_CC);
58 
59     free_custom_object(intern);
60 }
61 /* }}} */
62 
kafka_topic_new(zend_class_entry * class_type TSRMLS_DC)63 static zend_object_value kafka_topic_new(zend_class_entry *class_type TSRMLS_DC) /* {{{ */
64 {
65     zend_object_value retval;
66     kafka_topic_object *intern;
67 
68     intern = alloc_object(intern, class_type);
69     zend_object_std_init(&intern->std, class_type TSRMLS_CC);
70     object_properties_init(&intern->std, class_type);
71 
72     STORE_OBJECT(retval, intern, (zend_objects_store_dtor_t) zend_objects_destroy_object, kafka_topic_free, NULL);
73     SET_OBJECT_HANDLERS(retval, &object_handlers);
74 
75     return retval;
76 }
77 /* }}} */
78 
79 
consume_callback(rd_kafka_message_t * msg,void * opaque)80 static void consume_callback(rd_kafka_message_t *msg, void *opaque)
81 {
82     php_callback *cb = (php_callback*) opaque;
83     zeval args[1];
84     TSRMLS_FETCH();
85 
86     if (!opaque) {
87         return;
88     }
89 
90     if (!cb) {
91         return;
92     }
93 
94     MAKE_STD_ZEVAL(args[0]);
95 
96     kafka_message_new(P_ZEVAL(args[0]), msg TSRMLS_CC);
97 
98     rdkafka_call_function(&cb->fci, &cb->fcc, NULL, 1, args TSRMLS_CC);
99 
100     zval_ptr_dtor(&args[0]);
101 }
102 
get_kafka_topic_object(zval * zrkt TSRMLS_DC)103 kafka_topic_object * get_kafka_topic_object(zval *zrkt TSRMLS_DC)
104 {
105     kafka_topic_object *orkt = get_custom_object_zval(kafka_topic_object, zrkt);
106 
107     if (!orkt->rkt) {
108         zend_throw_exception_ex(NULL, 0 TSRMLS_CC, "RdKafka\\Topic::__construct() has not been called" TSRMLS_CC);
109         return NULL;
110     }
111 
112     return orkt;
113 }
114 
115 /* {{{ proto RdKafka\ConsumerTopic::consumeCallback([int $partition, int timeout_ms, mixed $callback]) */
116 
117 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_consume_callback, 0, 0, 3)
118     ZEND_ARG_INFO(0, partition)
119     ZEND_ARG_INFO(0, timeout_ms)
120     ZEND_ARG_INFO(0, callback)
ZEND_END_ARG_INFO()121 ZEND_END_ARG_INFO()
122 
123 PHP_METHOD(RdKafka__ConsumerTopic, consumeCallback)
124 {
125     php_callback cb;
126     zend_long partition;
127     zend_long timeout_ms;
128     long result;
129     kafka_topic_object *intern;
130 
131     if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "llf", &partition, &timeout_ms, &cb.fci, &cb.fcc) == FAILURE) {
132         return;
133     }
134 
135     if (partition < 0 || partition > 0x7FFFFFFF) {
136         zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
137         return;
138     }
139 
140     intern = get_kafka_topic_object(getThis() TSRMLS_CC);
141     if (!intern) {
142         return;
143     }
144 
145     Z_ADDREF_P(P_ZEVAL(cb.fci.function_name));
146 
147     result = rd_kafka_consume_callback(intern->rkt, partition, timeout_ms, consume_callback, &cb);
148 
149     zval_ptr_dtor(&cb.fci.function_name);
150 
151     RETURN_LONG(result);
152 }
153 /* }}} */
154 
155 /* {{{ proto void RdKafka\ConsumerTopic::consumeQueueStart(int $partition, int $offset, RdKafka\Queue $queue)
156  * Same as consumeStart(), but re-routes incoming messages to the provided queue */
157 
158 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_consume_queue_start, 0, 0, 3)
159     ZEND_ARG_INFO(0, partition)
160     ZEND_ARG_INFO(0, offset)
161     ZEND_ARG_INFO(0, queue)
ZEND_END_ARG_INFO()162 ZEND_END_ARG_INFO()
163 
164 PHP_METHOD(RdKafka__ConsumerTopic, consumeQueueStart)
165 {
166     zval *zrkqu;
167     kafka_topic_object *intern;
168     kafka_queue_object *queue_intern;
169     zend_long partition;
170     zend_long offset;
171     int ret;
172     rd_kafka_resp_err_t err;
173     kafka_object *kafka_intern;
174 
175     if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "llO", &partition, &offset, &zrkqu, ce_kafka_queue) == FAILURE) {
176         return;
177     }
178 
179     if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
180         zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
181         return;
182     }
183 
184     intern = get_kafka_topic_object(getThis() TSRMLS_CC);
185     if (!intern) {
186         return;
187     }
188 
189     queue_intern = get_kafka_queue_object(zrkqu TSRMLS_CC);
190     if (!queue_intern) {
191         return;
192     }
193 
194     kafka_intern = get_kafka_object(P_ZEVAL(intern->zrk) TSRMLS_CC);
195     if (!kafka_intern) {
196         return;
197     }
198 
199     if (is_consuming_toppar(kafka_intern, intern->rkt, partition)) {
200         zend_throw_exception_ex(
201             ce_kafka_exception,
202             0 TSRMLS_CC,
203             "%s:" ZEND_LONG_FMT " is already being consumed by the same Consumer instance",
204             rd_kafka_topic_name(intern->rkt),
205             partition
206         );
207         return;
208     }
209 
210     ret = rd_kafka_consume_start_queue(intern->rkt, partition, offset, queue_intern->rkqu);
211 
212     if (ret == -1) {
213         err = rd_kafka_last_error();
214         zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
215         return;
216     }
217 
218     add_consuming_toppar(kafka_intern, intern->rkt, partition);
219 }
220 /* }}} */
221 
222 /* {{{ proto void RdKafka\ConsumerTopic::consumeStart(int partition, int offset)
223    Start consuming messages */
224 
225 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_consume_start, 0, 0, 2)
226     ZEND_ARG_INFO(0, partition)
227     ZEND_ARG_INFO(0, offset)
ZEND_END_ARG_INFO()228 ZEND_END_ARG_INFO()
229 
230 PHP_METHOD(RdKafka__ConsumerTopic, consumeStart)
231 {
232     kafka_topic_object *intern;
233     zend_long partition;
234     zend_long offset;
235     int ret;
236     rd_kafka_resp_err_t err;
237     kafka_object *kafka_intern;
238 
239     if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll", &partition, &offset) == FAILURE) {
240         return;
241     }
242 
243     if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
244         zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
245         return;
246     }
247 
248     intern = get_kafka_topic_object(getThis() TSRMLS_CC);
249     if (!intern) {
250         return;
251     }
252 
253     kafka_intern = get_kafka_object(P_ZEVAL(intern->zrk) TSRMLS_CC);
254     if (!kafka_intern) {
255         return;
256     }
257 
258     if (is_consuming_toppar(kafka_intern, intern->rkt, partition)) {
259         zend_throw_exception_ex(
260             ce_kafka_exception,
261             0 TSRMLS_CC,
262             "%s:" ZEND_LONG_FMT " is already being consumed by the same Consumer instance",
263             rd_kafka_topic_name(intern->rkt),
264             partition
265         );
266         return;
267     }
268 
269     ret = rd_kafka_consume_start(intern->rkt, partition, offset);
270 
271     if (ret == -1) {
272         err = rd_kafka_last_error();
273         zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
274         return;
275     }
276 
277     add_consuming_toppar(kafka_intern, intern->rkt, partition);
278 }
279 /* }}} */
280 
281 /* {{{ proto void RdKafka\ConsumerTopic::consumeStop(int partition)
282    Stop consuming messages */
283 
284 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_consume_stop, 0, 0, 1)
285     ZEND_ARG_INFO(0, partition)
ZEND_END_ARG_INFO()286 ZEND_END_ARG_INFO()
287 
288 PHP_METHOD(RdKafka__ConsumerTopic, consumeStop)
289 {
290     kafka_topic_object *intern;
291     zend_long partition;
292     int ret;
293     rd_kafka_resp_err_t err;
294     kafka_object *kafka_intern;
295 
296     if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &partition) == FAILURE) {
297         return;
298     }
299 
300     if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
301         zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
302         return;
303     }
304 
305     intern = get_kafka_topic_object(getThis() TSRMLS_CC);
306     if (!intern) {
307         return;
308     }
309 
310     kafka_intern = get_kafka_object(P_ZEVAL(intern->zrk) TSRMLS_CC);
311     if (!kafka_intern) {
312         return;
313     }
314 
315     ret = rd_kafka_consume_stop(intern->rkt, partition);
316 
317     if (ret == -1) {
318         err = rd_kafka_last_error();
319         zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
320         return;
321     }
322 
323     del_consuming_toppar(kafka_intern, intern->rkt, partition);
324 }
325 /* }}} */
326 
327 /* {{{ proto RdKafka\Message RdKafka\ConsumerTopic::consume(int $partition, int timeout_ms)
328    Consume a single message from partition */
329 
330 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_consume, 0, 0, 2)
331     ZEND_ARG_INFO(0, partition)
332     ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()333 ZEND_END_ARG_INFO()
334 
335 PHP_METHOD(RdKafka__ConsumerTopic, consume)
336 {
337     kafka_topic_object *intern;
338     zend_long partition;
339     zend_long timeout_ms;
340     rd_kafka_message_t *message;
341     rd_kafka_resp_err_t err;
342 
343     if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll", &partition, &timeout_ms) == FAILURE) {
344         return;
345     }
346 
347     if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
348         zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
349         return;
350     }
351 
352     intern = get_kafka_topic_object(getThis() TSRMLS_CC);
353     if (!intern) {
354         return;
355     }
356 
357     message = rd_kafka_consume(intern->rkt, partition, timeout_ms);
358 
359     if (!message) {
360         err = rd_kafka_last_error();
361         if (err == RD_KAFKA_RESP_ERR__TIMED_OUT) {
362             return;
363         }
364         zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
365         return;
366     }
367 
368     kafka_message_new(return_value, message TSRMLS_CC);
369 
370     rd_kafka_message_destroy(message);
371 }
372 /* }}} */
373 
374 /* {{{ proto RdKafka\Message RdKafka\ConsumerTopic::consumeBatch(int $partition, int $timeout_ms, int $batch_size)
375    Consume a batch of messages from a partition */
376 
377 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_consume_batch, 0, 0, 3)
378     ZEND_ARG_INFO(0, partition)
379     ZEND_ARG_INFO(0, timeout_ms)
380     ZEND_ARG_INFO(0, batch_size)
ZEND_END_ARG_INFO()381 ZEND_END_ARG_INFO()
382 
383 PHP_METHOD(RdKafka__ConsumerTopic, consumeBatch)
384 {
385     kafka_topic_object *intern;
386     zend_long partition, timeout_ms, batch_size;
387     long result, i;
388     rd_kafka_message_t **rkmessages;
389     rd_kafka_resp_err_t err;
390 
391     if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lll", &partition, &timeout_ms, &batch_size) == FAILURE) {
392         return;
393     }
394 
395     if (0 >= batch_size) {
396         zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for batch_size", batch_size TSRMLS_CC);
397         return;
398     }
399 
400     if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
401         zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
402         return;
403     }
404 
405     intern = get_kafka_topic_object(getThis() TSRMLS_CC);
406     if (!intern) {
407         return;
408     }
409 
410     rkmessages = malloc(sizeof(*rkmessages) * batch_size);
411 
412     result = rd_kafka_consume_batch(intern->rkt, partition, timeout_ms, rkmessages, batch_size);
413 
414     if (result == -1) {
415         free(rkmessages);
416         err = rd_kafka_last_error();
417         zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
418         return;
419     }
420 
421     if (result >= 0) {
422         kafka_message_list_to_array(return_value, rkmessages, result TSRMLS_CC);
423         for (i = 0; i < result; ++i) {
424             rd_kafka_message_destroy(rkmessages[i]);
425         }
426     }
427 
428     free(rkmessages);
429 }
430 /* }}} */
431 
432 /* {{{ proto void RdKafka\ConsumerTopic::offsetStore(int partition, int offset) */
433 
434 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_offset_store, 0, 0, 2)
435     ZEND_ARG_INFO(0, partition)
436     ZEND_ARG_INFO(0, offset)
ZEND_END_ARG_INFO()437 ZEND_END_ARG_INFO()
438 
439 PHP_METHOD(RdKafka__ConsumerTopic, offsetStore)
440 {
441     kafka_topic_object *intern;
442     zend_long partition;
443     zend_long offset;
444     rd_kafka_resp_err_t err;
445 
446     if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll", &partition, &offset) == FAILURE) {
447         return;
448     }
449 
450     if (partition < 0 || partition > 0x7FFFFFFF) {
451         zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
452         return;
453     }
454 
455     intern = get_kafka_topic_object(getThis() TSRMLS_CC);
456     if (!intern) {
457         return;
458     }
459 
460     err = rd_kafka_offset_store(intern->rkt, partition, offset);
461 
462     if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
463         zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
464         return;
465     }
466 }
467 /* }}} */
468 
469 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka___private_construct, 0, 0, 0)
470 ZEND_END_ARG_INFO()
471 
472 static const zend_function_entry kafka_consumer_topic_fe[] = {
473     PHP_ME(RdKafka, __construct, arginfo_kafka___private_construct, ZEND_ACC_PRIVATE)
474     PHP_ME(RdKafka__ConsumerTopic, consumeQueueStart, arginfo_kafka_consume_queue_start, ZEND_ACC_PUBLIC)
475     PHP_ME(RdKafka__ConsumerTopic, consumeCallback, arginfo_kafka_consume_callback, ZEND_ACC_PUBLIC)
476     PHP_ME(RdKafka__ConsumerTopic, consumeStart, arginfo_kafka_consume_start, ZEND_ACC_PUBLIC)
477     PHP_ME(RdKafka__ConsumerTopic, consumeStop, arginfo_kafka_consume_stop, ZEND_ACC_PUBLIC)
478     PHP_ME(RdKafka__ConsumerTopic, consume, arginfo_kafka_consume, ZEND_ACC_PUBLIC)
479     PHP_ME(RdKafka__ConsumerTopic, consumeBatch, arginfo_kafka_consume_batch, ZEND_ACC_PUBLIC)
480     PHP_ME(RdKafka__ConsumerTopic, offsetStore, arginfo_kafka_offset_store, ZEND_ACC_PUBLIC)
481     PHP_FE_END
482 };
483 
484 static const zend_function_entry kafka_kafka_consumer_topic_fe[] = {
485     PHP_ME(RdKafka, __construct, arginfo_kafka___private_construct, ZEND_ACC_PRIVATE)
486     PHP_ME(RdKafka__ConsumerTopic, offsetStore, arginfo_kafka_offset_store, ZEND_ACC_PUBLIC)
487     PHP_FE_END
488 };
489 
490 /* {{{ proto void RdKafka\ProducerTopic::produce(int $partition, int $msgflags[, string $payload, string $key])
491    Produce and send a single message to broker. */
492 
493 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_produce, 0, 0, 2)
494     ZEND_ARG_INFO(0, partition)
495     ZEND_ARG_INFO(0, msgflags)
496     ZEND_ARG_INFO(0, payload)
497     ZEND_ARG_INFO(0, key)
ZEND_END_ARG_INFO()498 ZEND_END_ARG_INFO()
499 
500 PHP_METHOD(RdKafka__ProducerTopic, produce)
501 {
502     zend_long partition;
503     zend_long msgflags;
504     char *payload = NULL;
505     arglen_t payload_len = 0;
506     char *key = NULL;
507     arglen_t key_len = 0;
508     int ret;
509     rd_kafka_resp_err_t err;
510     kafka_topic_object *intern;
511 
512     if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll|s!s!", &partition, &msgflags, &payload, &payload_len, &key, &key_len) == FAILURE) {
513         return;
514     }
515 
516     if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
517         zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
518         return;
519     }
520 
521     if (msgflags != 0 && msgflags != RD_KAFKA_MSG_F_BLOCK) {
522         zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Invalid value '%ld' for $msgflags", msgflags TSRMLS_CC);
523         return;
524     }
525 
526     intern = get_kafka_topic_object(getThis() TSRMLS_CC);
527 
528     ret = rd_kafka_produce(intern->rkt, partition, msgflags | RD_KAFKA_MSG_F_COPY, payload, payload_len, key, key_len, NULL);
529 
530     if (ret == -1) {
531         err = rd_kafka_last_error();
532         zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
533         return;
534     }
535 }
536 /* }}} */
537 
538 #ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
539 /* {{{ proto void RdKafka\ProducerTopic::producev(int $partition, int $msgflags[, string $payload, string $key, array $headers, int $timestamp_ms])
540    Produce and send a single message to broker (with headers possibility and timestamp). */
541 
542 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_producev, 0, 0, 2)
543     ZEND_ARG_INFO(0, partition)
544     ZEND_ARG_INFO(0, msgflags)
545     ZEND_ARG_INFO(0, payload)
546     ZEND_ARG_INFO(0, key)
547     ZEND_ARG_INFO(0, headers)
548     ZEND_ARG_INFO(0, timestamp_ms)
ZEND_END_ARG_INFO()549 ZEND_END_ARG_INFO()
550 
551 PHP_METHOD(RdKafka__ProducerTopic, producev)
552 {
553     zend_long partition;
554     zend_long msgflags;
555     char *payload = NULL;
556     arglen_t payload_len = 0;
557     char *key = NULL;
558     arglen_t key_len = 0;
559     rd_kafka_resp_err_t err;
560     kafka_topic_object *intern;
561     kafka_object *kafka_intern;
562     HashTable *headersParam = NULL;
563     HashPosition headersParamPos;
564     char *header_key;
565     zeval *header_value;
566     rd_kafka_headers_t *headers;
567     zend_long timestamp_ms = 0;
568     zend_bool timestamp_ms_is_null = 0;
569 
570     if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll|s!s!h!l!", &partition, &msgflags, &payload, &payload_len, &key, &key_len, &headersParam, &timestamp_ms, &timestamp_ms_is_null) == FAILURE) {
571         return;
572     }
573 
574     if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
575         zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
576         return;
577     }
578 
579     if (msgflags != 0 && msgflags != RD_KAFKA_MSG_F_BLOCK) {
580         zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Invalid value '%ld' for $msgflags", msgflags TSRMLS_CC);
581         return;
582     }
583 
584     if (timestamp_ms_is_null == 1) {
585         timestamp_ms = 0;
586     }
587 
588     intern = get_kafka_topic_object(getThis() TSRMLS_CC);
589 
590     if (headersParam != NULL && zend_hash_num_elements(headersParam) > 0) {
591         headers = rd_kafka_headers_new(zend_hash_num_elements(headersParam));
592         for (zend_hash_internal_pointer_reset_ex(headersParam, &headersParamPos);
593                 (header_value = rdkafka_hash_get_current_data_ex(headersParam, &headersParamPos)) != NULL &&
594                 (header_key = rdkafka_hash_get_current_key_ex(headersParam, &headersParamPos)) != NULL;
595                 zend_hash_move_forward_ex(headersParam, &headersParamPos)) {
596             convert_to_string_ex(header_value);
597             rd_kafka_header_add(
598                 headers,
599                 header_key,
600                 -1, // Auto detect header title length
601                 Z_STRVAL_P(ZEVAL(header_value)),
602                 Z_STRLEN_P(ZEVAL(header_value))
603             );
604         }
605     } else {
606         headers = rd_kafka_headers_new(0);
607     }
608 
609     kafka_intern = get_kafka_object(P_ZEVAL(intern->zrk) TSRMLS_CC);
610     if (!kafka_intern) {
611         return;
612     }
613 
614     err = rd_kafka_producev(
615             kafka_intern->rk,
616             RD_KAFKA_V_RKT(intern->rkt),
617             RD_KAFKA_V_PARTITION(partition),
618             RD_KAFKA_V_MSGFLAGS(msgflags | RD_KAFKA_MSG_F_COPY),
619             RD_KAFKA_V_VALUE(payload, payload_len),
620             RD_KAFKA_V_KEY(key, key_len),
621             RD_KAFKA_V_TIMESTAMP(timestamp_ms),
622             RD_KAFKA_V_HEADERS(headers),
623             RD_KAFKA_V_END
624     );
625 
626     if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
627         rd_kafka_headers_destroy(headers);
628         zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
629         return;
630     }
631 }
632 /* }}} */
633 #endif
634 
635 static const zend_function_entry kafka_producer_topic_fe[] = {
636     PHP_ME(RdKafka, __construct, arginfo_kafka___private_construct, ZEND_ACC_PRIVATE)
637     PHP_ME(RdKafka__ProducerTopic, produce, arginfo_kafka_produce, ZEND_ACC_PUBLIC)
638 #ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
639     PHP_ME(RdKafka__ProducerTopic, producev, arginfo_kafka_producev, ZEND_ACC_PUBLIC)
640 #endif
641     PHP_FE_END
642 };
643 
644 /* {{{ proto string RdKafka\Topic::getName() */
645 
646 ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_topic_get_name, 0, 0, 0)
ZEND_END_ARG_INFO()647 ZEND_END_ARG_INFO()
648 
649 PHP_METHOD(RdKafka__Topic, getName)
650 {
651     kafka_topic_object *intern;
652 
653     if (zend_parse_parameters_none() == FAILURE) {
654         return;
655     }
656 
657     intern = get_kafka_topic_object(getThis() TSRMLS_CC);
658     if (!intern) {
659         return;
660     }
661 
662     RDKAFKA_RETURN_STRING(rd_kafka_topic_name(intern->rkt));
663 }
664 /* }}} */
665 
666 static const zend_function_entry kafka_topic_fe[] = {
667     PHP_ME(RdKafka__Topic, getName, arginfo_kafka_topic_get_name, ZEND_ACC_PUBLIC)
668     PHP_FE_END
669 };
670 
kafka_topic_minit(TSRMLS_D)671 void kafka_topic_minit(TSRMLS_D) { /* {{{ */
672 
673     zend_class_entry ce;
674 
675     memcpy(&object_handlers, zend_get_std_object_handlers(), sizeof(zend_object_handlers));
676     object_handlers.clone_obj = NULL;
677     set_object_handler_free_obj(&object_handlers, kafka_topic_free);
678     set_object_handler_offset(&object_handlers, XtOffsetOf(kafka_topic_object, std));
679 
680     INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Topic", kafka_topic_fe);
681     ce_kafka_topic = zend_register_internal_class(&ce TSRMLS_CC);
682     ce_kafka_topic->ce_flags |= ZEND_ACC_EXPLICIT_ABSTRACT_CLASS;
683     ce_kafka_topic->create_object = kafka_topic_new;
684 
685     INIT_NS_CLASS_ENTRY(ce, "RdKafka", "ConsumerTopic", kafka_consumer_topic_fe);
686     ce_kafka_consumer_topic = rdkafka_register_internal_class_ex(&ce, ce_kafka_topic TSRMLS_CC);
687 
688     INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumerTopic", kafka_kafka_consumer_topic_fe);
689     ce_kafka_kafka_consumer_topic = rdkafka_register_internal_class_ex(&ce, ce_kafka_topic TSRMLS_CC);
690 
691     INIT_NS_CLASS_ENTRY(ce, "RdKafka", "ProducerTopic", kafka_producer_topic_fe);
692     ce_kafka_producer_topic = rdkafka_register_internal_class_ex(&ce, ce_kafka_topic TSRMLS_CC);
693 } /* }}} */
694