1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30 #include "rdkafka.h"
31
32 /**
33 * Verify message timestamp behaviour on supporting brokers (>=0.10.0.0).
34 * Issue #858
35 */
36 struct timestamp_range {
37 int64_t min;
38 int64_t max;
39 };
40
41 static const struct timestamp_range invalid_timestamp = { -1, -1 };
42 static struct timestamp_range broker_timestamp;
43 static struct timestamp_range my_timestamp;
44
prepare_timestamps(void)45 static void prepare_timestamps (void) {
46 struct timeval ts;
47 rd_gettimeofday(&ts, NULL);
48
49 /* broker timestamps expected to be within 600 seconds */
50 broker_timestamp.min = (int64_t)ts.tv_sec * 1000LLU;
51 broker_timestamp.max = broker_timestamp.min + (600 * 1000LLU);
52
53 /* client timestamps: set in the future (24 hours)
54 * to be outside of broker timestamps */
55 my_timestamp.min = my_timestamp.max =
56 (int64_t)ts.tv_sec + (24 * 3600 * 1000LLU);
57 }
58
59 /**
60 * @brief Produce messages according to compress \p codec
61 */
produce_msgs(const char * topic,int partition,uint64_t testid,int msgcnt,const char * broker_version,const char * codec)62 static void produce_msgs (const char *topic, int partition, uint64_t testid,
63 int msgcnt, const char *broker_version,
64 const char *codec) {
65 rd_kafka_conf_t *conf;
66 rd_kafka_t *rk;
67 int i;
68 char key[128], buf[100];
69 int msgcounter = msgcnt;
70
71 test_conf_init(&conf, NULL, 0);
72 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
73 test_conf_set(conf, "compression.codec", codec);
74 test_conf_set(conf, "broker.version.fallback", broker_version);
75 if (!strncmp(broker_version, "0.8", 3) ||
76 !strncmp(broker_version, "0.9", 3)) {
77 test_conf_set(conf, "api.version.request", "false");
78 test_conf_set(conf, "enable.idempotence", "false");
79 }
80
81 /* Make sure to trigger a bunch of MessageSets */
82 test_conf_set(conf, "batch.num.messages", tsprintf("%d", msgcnt/5));
83 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
84
85 for (i = 0 ; i < msgcnt ; i++) {
86 rd_kafka_resp_err_t err;
87
88 test_prepare_msg(testid, partition, i,
89 buf, sizeof(buf), key, sizeof(key));
90
91 err = rd_kafka_producev(rk,
92 RD_KAFKA_V_TOPIC(topic),
93 RD_KAFKA_V_VALUE(buf, sizeof(buf)),
94 RD_KAFKA_V_KEY(key, sizeof(key)),
95 RD_KAFKA_V_TIMESTAMP(my_timestamp.min),
96 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
97 RD_KAFKA_V_OPAQUE(&msgcounter),
98 RD_KAFKA_V_END);
99 if (err)
100 TEST_FAIL("producev() failed at msg #%d/%d: %s",
101 i, msgcnt, rd_kafka_err2str(err));
102 }
103
104 TEST_SAY("Waiting for %d messages to be produced\n", msgcounter);
105 while (msgcounter > 0)
106 rd_kafka_poll(rk, 100);
107
108 rd_kafka_destroy(rk);
109 }
110
111 static void
consume_msgs_verify_timestamps(const char * topic,int partition,uint64_t testid,int msgcnt,const struct timestamp_range * exp_timestamp)112 consume_msgs_verify_timestamps (const char *topic, int partition,
113 uint64_t testid, int msgcnt,
114 const struct timestamp_range *exp_timestamp) {
115 test_msgver_t mv;
116
117 test_msgver_init(&mv, testid);
118 test_consume_msgs_easy_mv(topic, topic, -1,
119 testid, -1, msgcnt, NULL, &mv);
120
121 test_msgver_verify0(__FUNCTION__, __LINE__,
122 topic, &mv,
123 TEST_MSGVER_RANGE|
124 TEST_MSGVER_BY_MSGID|TEST_MSGVER_BY_TIMESTAMP,
125 (struct test_mv_vs){ .msg_base = 0,
126 .exp_cnt = msgcnt,
127 .timestamp_min = exp_timestamp->min,
128 .timestamp_max = exp_timestamp->max
129 });
130
131 test_msgver_clear(&mv);
132 }
133
134
135
test_timestamps(const char * broker_tstype,const char * broker_version,const char * codec,const struct timestamp_range * exp_timestamps)136 static void test_timestamps (const char *broker_tstype,
137 const char *broker_version,
138 const char *codec,
139 const struct timestamp_range *exp_timestamps) {
140 const char *topic = test_mk_topic_name(
141 tsprintf("0052_msg_timestamps_%s_%s_%s",
142 broker_tstype, broker_version, codec), 1);
143 const int msgcnt = 20;
144 uint64_t testid = test_id_generate();
145
146 if ((!strncmp(broker_version, "0.9", 3) ||
147 !strncmp(broker_version, "0.8", 3)) &&
148 !test_conf_match(NULL, "sasl.mechanisms", "GSSAPI")) {
149 TEST_SAY(_C_YEL "Skipping %s, %s test: "
150 "SaslHandshake not supported by broker v%s" _C_CLR "\n",
151 broker_tstype, codec, broker_version);
152 return;
153 }
154
155 TEST_SAY(_C_MAG "Timestamp test using %s\n", topic);
156 test_timeout_set(30);
157
158 test_kafka_topics("--create --topic \"%s\" "
159 "--replication-factor 1 --partitions 1 "
160 "--config message.timestamp.type=%s",
161 topic, broker_tstype);
162
163 TEST_SAY(_C_MAG "Producing %d messages to %s\n", msgcnt, topic);
164 produce_msgs(topic, 0, testid, msgcnt, broker_version, codec);
165
166 TEST_SAY(_C_MAG "Consuming and verifying %d messages from %s "
167 "with expected timestamps %"PRId64"..%"PRId64"\n",
168 msgcnt, topic,
169 exp_timestamps->min, exp_timestamps->max);
170
171 consume_msgs_verify_timestamps(topic, 0, testid, msgcnt,
172 exp_timestamps);
173 }
174
175
main_0052_msg_timestamps(int argc,char ** argv)176 int main_0052_msg_timestamps (int argc, char **argv) {
177
178 if (!test_can_create_topics(1))
179 return 0;
180
181 /* Broker version limits the producer's feature set,
182 * for 0.9.0.0 no timestamp will be transmitted,
183 * but for 0.10.1.0 (or newer, api.version.request will be true)
184 * the producer will set the timestamp.
185 * In all cases we want a reasonable timestamp back.
186 *
187 * Explicit broker LogAppendTime setting will overwrite
188 * any producer-provided offset.
189 *
190 * Using the old non-timestamp-aware protocol without
191 * LogAppendTime will cause unset/invalid timestamps .
192 *
193 * Any other option should honour the producer create timestamps.
194 */
195 prepare_timestamps();
196
197 test_timestamps("CreateTime", "0.10.1.0", "none", &my_timestamp);
198 test_timestamps("LogAppendTime", "0.10.1.0", "none", &broker_timestamp);
199 test_timestamps("CreateTime", "0.9.0.0", "none", &invalid_timestamp);
200 test_timestamps("LogAppendTime", "0.9.0.0", "none", &broker_timestamp);
201 #if WITH_ZLIB
202 test_timestamps("CreateTime", "0.10.1.0", "gzip", &my_timestamp);
203 test_timestamps("LogAppendTime", "0.10.1.0", "gzip", &broker_timestamp);
204 test_timestamps("CreateTime", "0.9.0.0", "gzip", &invalid_timestamp);
205 test_timestamps("LogAppendTime", "0.9.0.0", "gzip", &broker_timestamp);
206 #endif
207
208 return 0;
209 }
210