1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 #include "rdkafka.h"
31 
32 /**
33  * Verify message timestamp behaviour on supporting brokers (>=0.10.0.0).
34  * Issue #858
35  */
36 struct timestamp_range {
37         int64_t min;
38         int64_t max;
39 };
40 
41 static const struct timestamp_range invalid_timestamp = { -1, -1 };
42 static struct timestamp_range broker_timestamp;
43 static struct timestamp_range my_timestamp;
44 
prepare_timestamps(void)45 static void prepare_timestamps (void) {
46         struct timeval ts;
47         rd_gettimeofday(&ts, NULL);
48 
49         /* broker timestamps expected to be within 600 seconds */
50         broker_timestamp.min = (int64_t)ts.tv_sec * 1000LLU;
51         broker_timestamp.max = broker_timestamp.min + (600 * 1000LLU);
52 
53         /* client timestamps: set in the future (24 hours)
54          * to be outside of broker timestamps */
55         my_timestamp.min = my_timestamp.max =
56                 (int64_t)ts.tv_sec + (24 * 3600 * 1000LLU);
57 }
58 
59 /**
60  * @brief Produce messages according to compress \p codec
61  */
produce_msgs(const char * topic,int partition,uint64_t testid,int msgcnt,const char * broker_version,const char * codec)62 static void produce_msgs (const char *topic, int partition, uint64_t testid,
63                           int msgcnt, const char *broker_version,
64                           const char *codec) {
65         rd_kafka_conf_t *conf;
66         rd_kafka_t *rk;
67         int i;
68         char key[128], buf[100];
69         int msgcounter = msgcnt;
70 
71         test_conf_init(&conf, NULL, 0);
72         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
73         test_conf_set(conf, "compression.codec", codec);
74         test_conf_set(conf, "broker.version.fallback", broker_version);
75         if (!strncmp(broker_version, "0.8", 3) ||
76             !strncmp(broker_version, "0.9", 3)) {
77                 test_conf_set(conf, "api.version.request", "false");
78                 test_conf_set(conf, "enable.idempotence", "false");
79         }
80 
81         /* Make sure to trigger a bunch of MessageSets */
82         test_conf_set(conf, "batch.num.messages", tsprintf("%d", msgcnt/5));
83         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
84 
85         for (i = 0 ; i < msgcnt ; i++) {
86                 rd_kafka_resp_err_t err;
87 
88                 test_prepare_msg(testid, partition, i,
89                                  buf, sizeof(buf), key, sizeof(key));
90 
91                 err = rd_kafka_producev(rk,
92                                         RD_KAFKA_V_TOPIC(topic),
93                                         RD_KAFKA_V_VALUE(buf, sizeof(buf)),
94                                         RD_KAFKA_V_KEY(key, sizeof(key)),
95                                         RD_KAFKA_V_TIMESTAMP(my_timestamp.min),
96                                         RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
97                                         RD_KAFKA_V_OPAQUE(&msgcounter),
98                                         RD_KAFKA_V_END);
99                 if (err)
100                         TEST_FAIL("producev() failed at msg #%d/%d: %s",
101                                   i, msgcnt, rd_kafka_err2str(err));
102         }
103 
104         TEST_SAY("Waiting for %d messages to be produced\n", msgcounter);
105         while (msgcounter > 0)
106                 rd_kafka_poll(rk, 100);
107 
108         rd_kafka_destroy(rk);
109 }
110 
111 static void
consume_msgs_verify_timestamps(const char * topic,int partition,uint64_t testid,int msgcnt,const struct timestamp_range * exp_timestamp)112 consume_msgs_verify_timestamps (const char *topic, int partition,
113                                 uint64_t testid, int msgcnt,
114                                 const struct timestamp_range *exp_timestamp) {
115         test_msgver_t mv;
116 
117         test_msgver_init(&mv, testid);
118         test_consume_msgs_easy_mv(topic, topic, -1,
119                                   testid, -1, msgcnt, NULL, &mv);
120 
121         test_msgver_verify0(__FUNCTION__, __LINE__,
122                             topic, &mv,
123                             TEST_MSGVER_RANGE|
124                             TEST_MSGVER_BY_MSGID|TEST_MSGVER_BY_TIMESTAMP,
125                             (struct test_mv_vs){ .msg_base = 0,
126                                             .exp_cnt = msgcnt,
127                                             .timestamp_min = exp_timestamp->min,
128                                             .timestamp_max = exp_timestamp->max
129                                             });
130 
131         test_msgver_clear(&mv);
132 }
133 
134 
135 
test_timestamps(const char * broker_tstype,const char * broker_version,const char * codec,const struct timestamp_range * exp_timestamps)136 static void test_timestamps (const char *broker_tstype,
137                              const char *broker_version,
138                              const char *codec,
139                              const struct timestamp_range *exp_timestamps) {
140         const char *topic = test_mk_topic_name(
141                 tsprintf("0052_msg_timestamps_%s_%s_%s",
142                          broker_tstype, broker_version, codec), 1);
143         const int msgcnt = 20;
144         uint64_t testid = test_id_generate();
145 
146         if ((!strncmp(broker_version, "0.9", 3) ||
147              !strncmp(broker_version, "0.8", 3)) &&
148             !test_conf_match(NULL, "sasl.mechanisms", "GSSAPI")) {
149                 TEST_SAY(_C_YEL "Skipping %s, %s test: "
150                          "SaslHandshake not supported by broker v%s" _C_CLR "\n",
151                          broker_tstype, codec, broker_version);
152                 return;
153         }
154 
155         TEST_SAY(_C_MAG "Timestamp test using %s\n", topic);
156         test_timeout_set(30);
157 
158         test_kafka_topics("--create --topic \"%s\" "
159                           "--replication-factor 1 --partitions 1 "
160                           "--config message.timestamp.type=%s",
161                           topic, broker_tstype);
162 
163         TEST_SAY(_C_MAG "Producing %d messages to %s\n", msgcnt, topic);
164         produce_msgs(topic, 0, testid, msgcnt, broker_version, codec);
165 
166         TEST_SAY(_C_MAG "Consuming and verifying %d messages from %s "
167                  "with expected timestamps %"PRId64"..%"PRId64"\n",
168                  msgcnt, topic,
169                  exp_timestamps->min, exp_timestamps->max);
170 
171         consume_msgs_verify_timestamps(topic, 0, testid, msgcnt,
172                                        exp_timestamps);
173 }
174 
175 
main_0052_msg_timestamps(int argc,char ** argv)176 int main_0052_msg_timestamps (int argc, char **argv) {
177 
178         if (!test_can_create_topics(1))
179                 return 0;
180 
181         /* Broker version limits the producer's feature set,
182          * for 0.9.0.0 no timestamp will be transmitted,
183          * but for 0.10.1.0 (or newer, api.version.request will be true)
184          * the producer will set the timestamp.
185          * In all cases we want a reasonable timestamp back.
186          *
187          * Explicit broker LogAppendTime setting will overwrite
188          * any producer-provided offset.
189          *
190          * Using the old non-timestamp-aware protocol without
191          * LogAppendTime will cause unset/invalid timestamps .
192          *
193          * Any other option should honour the producer create timestamps.
194          */
195         prepare_timestamps();
196 
197         test_timestamps("CreateTime",    "0.10.1.0", "none", &my_timestamp);
198         test_timestamps("LogAppendTime", "0.10.1.0", "none", &broker_timestamp);
199         test_timestamps("CreateTime",    "0.9.0.0",  "none", &invalid_timestamp);
200         test_timestamps("LogAppendTime", "0.9.0.0",  "none", &broker_timestamp);
201 #if WITH_ZLIB
202         test_timestamps("CreateTime",    "0.10.1.0", "gzip", &my_timestamp);
203         test_timestamps("LogAppendTime", "0.10.1.0", "gzip", &broker_timestamp);
204         test_timestamps("CreateTime",    "0.9.0.0",  "gzip", &invalid_timestamp);
205         test_timestamps("LogAppendTime", "0.9.0.0",  "gzip", &broker_timestamp);
206 #endif
207 
208         return 0;
209 }
210