1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2013, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 /**
30 * Tests "message.bytes.max"
31 * Issue #24
32 */
33
34 #include "test.h"
35
36 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
37 * is built from within the librdkafka source tree and thus differs. */
38 #include "rdkafka.h" /* for Kafka driver */
39
40
41 static int msgs_wait = 0; /* bitmask */
42
43 /**
44 * Delivery report callback.
45 * Called for each message once to signal its delivery status.
46 */
dr_cb(rd_kafka_t * rk,void * payload,size_t len,rd_kafka_resp_err_t err,void * opaque,void * msg_opaque)47 static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
48 rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) {
49 int msgid = *(int *)msg_opaque;
50
51 free(msg_opaque);
52
53 if (err)
54 TEST_FAIL("Unexpected delivery error for message #%i: %s\n",
55 msgid, rd_kafka_err2str(err));
56
57 if (!(msgs_wait & (1 << msgid)))
58 TEST_FAIL("Unwanted delivery report for message #%i "
59 "(waiting for 0x%x)\n", msgid, msgs_wait);
60
61 TEST_SAY("Delivery report for message #%i: %s\n",
62 msgid, rd_kafka_err2str(err));
63
64 msgs_wait &= ~(1 << msgid);
65 }
66
67
main_0003_msgmaxsize(int argc,char ** argv)68 int main_0003_msgmaxsize (int argc, char **argv) {
69 int partition = 0;
70 int r;
71 rd_kafka_t *rk;
72 rd_kafka_topic_t *rkt;
73 rd_kafka_conf_t *conf;
74 rd_kafka_topic_conf_t *topic_conf;
75 char errstr[512];
76
77 static const struct {
78 ssize_t keylen;
79 ssize_t len;
80 rd_kafka_resp_err_t exp_err;
81 } sizes[] = {
82 /* message.max.bytes is including framing */
83 { -1, 5000, RD_KAFKA_RESP_ERR_NO_ERROR },
84 { 0, 99900, RD_KAFKA_RESP_ERR_NO_ERROR },
85 { 0, 100000, RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE },
86 { 100000, 0, RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE },
87 { 1000, 100000, RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE },
88 { 0, 101000, RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE },
89 { 99000, -1, RD_KAFKA_RESP_ERR_NO_ERROR },
90 { -1, -1, RD_KAFKA_RESP_ERR__END }
91 };
92 int i;
93
94 test_conf_init(&conf, &topic_conf, 10);
95
96 /* Set a small maximum message size. */
97 if (rd_kafka_conf_set(conf, "message.max.bytes", "100000",
98 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
99 TEST_FAIL("%s\n", errstr);
100
101 /* Set delivery report callback */
102 rd_kafka_conf_set_dr_cb(conf, dr_cb);
103
104 /* Create kafka instance */
105 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
106
107 rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0003", 0),
108 topic_conf);
109 if (!rkt)
110 TEST_FAIL("Failed to create topic: %s\n",
111 rd_strerror(errno));
112
113 for (i = 0 ; sizes[i].exp_err != RD_KAFKA_RESP_ERR__END ; i++) {
114 void *value = sizes[i].len != -1 ?
115 calloc(1, sizes[i].len) : NULL;
116 size_t len = sizes[i].len != -1 ? sizes[i].len : 0;
117 void *key = sizes[i].keylen != -1 ?
118 calloc(1, sizes[i].keylen) : NULL;
119 size_t keylen = sizes[i].keylen != -1 ? sizes[i].keylen : 0;
120 int *msgidp = malloc(sizeof(*msgidp));
121 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
122
123 *msgidp = i;
124
125 r = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY,
126 value, len,
127 key, keylen,
128 msgidp);
129 if (r == -1)
130 err = rd_kafka_last_error();
131
132 if (err != sizes[i].exp_err) {
133 TEST_FAIL("Msg #%d produce(len=%"PRIdsz
134 ", keylen=%"PRIdsz"): got %s, expected %s",
135 i,
136 sizes[i].len,
137 sizes[i].keylen,
138 rd_kafka_err2name(err),
139 rd_kafka_err2name(sizes[i].exp_err));
140 } else {
141 TEST_SAY("Msg #%d produce() returned expected %s "
142 "for value size %"PRIdsz
143 " and key size %"PRIdsz"\n",
144 i,
145 rd_kafka_err2name(err),
146 sizes[i].len,
147 sizes[i].keylen);
148
149 if (!sizes[i].exp_err)
150 msgs_wait |= (1 << i);
151 else
152 free(msgidp);
153 }
154 }
155
156 /* Wait for messages to be delivered. */
157 while (rd_kafka_outq_len(rk) > 0)
158 rd_kafka_poll(rk, 50);
159
160 if (msgs_wait != 0)
161 TEST_FAIL("Still waiting for messages: 0x%x\n", msgs_wait);
162
163 /* Destroy topic */
164 rd_kafka_topic_destroy(rkt);
165
166 /* Destroy rdkafka instance */
167 TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
168 rd_kafka_destroy(rk);
169
170 return 0;
171 }
172