1 /*
2  * librdkafka - The Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2019 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 
30 /**
31  * @name SSL certificates
32  *
33  */
34 
35 #include "rdkafka_int.h"
36 #include "rdkafka_transport_int.h"
37 
38 
39 #if WITH_SSL
40 #include "rdkafka_ssl.h"
41 
42 #include <openssl/x509.h>
43 #include <openssl/evp.h>
44 
45 /**
46  * @brief OpenSSL password query callback using a conf struct.
47  *
48  * @locality application thread
49  */
rd_kafka_conf_ssl_passwd_cb(char * buf,int size,int rwflag,void * userdata)50 static int rd_kafka_conf_ssl_passwd_cb (char *buf, int size, int rwflag,
51                                         void *userdata) {
52         const rd_kafka_conf_t *conf = userdata;
53         int pwlen;
54 
55         if (!conf->ssl.key_password)
56                 return -1;
57 
58         pwlen = (int) strlen(conf->ssl.key_password);
59         memcpy(buf, conf->ssl.key_password, RD_MIN(pwlen, size));
60 
61         return pwlen;
62 }
63 
64 
65 
66 static const char *rd_kafka_cert_type_names[] = {
67         "public-key",
68         "private-key",
69         "CA"
70 };
71 
72 static const char *rd_kafka_cert_enc_names[] = {
73         "PKCS#12",
74         "DER",
75         "PEM"
76 };
77 
78 
79 /**
80  * @brief Destroy a certificate
81  */
rd_kafka_cert_destroy(rd_kafka_cert_t * cert)82 static void rd_kafka_cert_destroy (rd_kafka_cert_t *cert) {
83         if (rd_refcnt_sub(&cert->refcnt) > 0)
84                 return;
85 
86         if (cert->x509)
87                 X509_free(cert->x509);
88         if (cert->pkey)
89                 EVP_PKEY_free(cert->pkey);
90         if (cert->store)
91                 X509_STORE_free(cert->store);
92 
93         rd_free(cert);
94 }
95 
96 
97 /**
98  * @brief Create a copy of a cert
99  */
rd_kafka_cert_dup(rd_kafka_cert_t * src)100 static rd_kafka_cert_t *rd_kafka_cert_dup (rd_kafka_cert_t *src) {
101         rd_refcnt_add(&src->refcnt);
102         return src;
103 }
104 
105 /**
106  * @brief Print the OpenSSL error stack do stdout, for development use.
107  */
rd_kafka_print_ssl_errors(void)108 static RD_UNUSED void rd_kafka_print_ssl_errors (void) {
109         unsigned long l;
110         const char *file, *data;
111         int line, flags;
112 
113         while ((l = ERR_get_error_line_data(&file, &line,
114                                             &data, &flags)) != 0) {
115                 char buf[256];
116 
117                 ERR_error_string_n(l, buf, sizeof(buf));
118 
119                 printf("ERR: %s:%d: %s: %s:\n",
120                        file, line, buf, (flags & ERR_TXT_STRING) ? data : "");
121                 printf("  %lu:%s : %s : %s : %d : %s (%p, %d, fl 0x%x)\n",
122                        l,
123                        ERR_lib_error_string(l),
124                        ERR_func_error_string(l),
125                        file, line,
126                        (flags & ERR_TXT_STRING) && data && *data ?
127                        data : ERR_reason_error_string(l),
128                        data, data ? (int)strlen(data) : -1,
129                        flags & ERR_TXT_STRING);
130 
131         }
132 }
133 
134 /**
135  * @returns a cert structure with a copy of the memory in \p buffer on success,
136  *          or NULL on failure in which case errstr will have a human-readable
137  *          error string written to it.
138  */
rd_kafka_cert_new(const rd_kafka_conf_t * conf,rd_kafka_cert_type_t type,rd_kafka_cert_enc_t encoding,const void * buffer,size_t size,char * errstr,size_t errstr_size)139 static rd_kafka_cert_t *rd_kafka_cert_new (const rd_kafka_conf_t *conf,
140                                            rd_kafka_cert_type_t type,
141                                            rd_kafka_cert_enc_t encoding,
142                                            const void *buffer, size_t size,
143                                            char *errstr, size_t errstr_size) {
144         static const rd_bool_t
145                 valid[RD_KAFKA_CERT__CNT][RD_KAFKA_CERT_ENC__CNT] = {
146                 /* Valid encodings per certificate type */
147                 [RD_KAFKA_CERT_PUBLIC_KEY] = {
148                         [RD_KAFKA_CERT_ENC_PKCS12] = rd_true,
149                         [RD_KAFKA_CERT_ENC_DER] = rd_true,
150                         [RD_KAFKA_CERT_ENC_PEM] =  rd_true
151                 },
152                 [RD_KAFKA_CERT_PRIVATE_KEY] = {
153                         [RD_KAFKA_CERT_ENC_PKCS12] = rd_true,
154                         [RD_KAFKA_CERT_ENC_DER] = rd_true,
155                         [RD_KAFKA_CERT_ENC_PEM] =  rd_true
156                 },
157                 [RD_KAFKA_CERT_CA] = {
158                         [RD_KAFKA_CERT_ENC_PKCS12] = rd_true,
159                         [RD_KAFKA_CERT_ENC_DER] = rd_true,
160                         [RD_KAFKA_CERT_ENC_PEM] = rd_true
161                 },
162         };
163         const char *action = "";
164         BIO *bio;
165         rd_kafka_cert_t *cert = NULL;
166         PKCS12 *p12 = NULL;
167 
168         if ((int)type < 0 || type >= RD_KAFKA_CERT__CNT) {
169                 rd_snprintf(errstr, errstr_size,
170                             "Invalid certificate type %d", (int)type);
171                 return NULL;
172         }
173 
174         if ((int)encoding < 0 || encoding >= RD_KAFKA_CERT_ENC__CNT) {
175                 rd_snprintf(errstr, errstr_size,
176                             "Invalid certificate encoding %d", (int)encoding);
177                 return NULL;
178         }
179 
180         if (!valid[type][encoding]) {
181                 rd_snprintf(errstr, errstr_size,
182                             "Invalid encoding %s for certificate type %s",
183                             rd_kafka_cert_enc_names[encoding],
184                             rd_kafka_cert_type_names[type]);
185                 return NULL;
186         }
187 
188         action = "read memory";
189         bio = BIO_new_mem_buf((void *)buffer, (long)size);
190         if (!bio)
191                 goto fail;
192 
193         if (encoding == RD_KAFKA_CERT_ENC_PKCS12) {
194                 action = "read PKCS#12";
195                 p12 = d2i_PKCS12_bio(bio, NULL);
196                 if (!p12)
197                         goto fail;
198         }
199 
200         cert = rd_calloc(1, sizeof(*cert));
201         cert->type = type;
202         cert->encoding = encoding;
203 
204         rd_refcnt_init(&cert->refcnt, 1);
205 
206         switch (type)
207         {
208         case RD_KAFKA_CERT_CA:
209                 cert->store = X509_STORE_new();
210 
211                 switch (encoding)
212                 {
213                         case RD_KAFKA_CERT_ENC_PKCS12:
214                         {
215                                 EVP_PKEY *ign_pkey;
216                                 X509 *ign_cert;
217                                 STACK_OF(X509) *cas = NULL;
218                                 int i;
219 
220                                 action = "parse PKCS#12";
221                                 if (!PKCS12_parse(p12, conf->ssl.key_password,
222                                                   &ign_pkey, &ign_cert,
223                                                   &cas))
224                                         goto fail;
225 
226                                 EVP_PKEY_free(ign_pkey);
227                                 X509_free(ign_cert);
228 
229                                 if (!cas || sk_X509_num(cas) < 1) {
230                                         action = "retrieve at least one CA "
231                                                 "cert from PKCS#12";
232                                         if (cas)
233                                                 sk_X509_pop_free(cas,
234                                                                  X509_free);
235                                         goto fail;
236                                 }
237 
238                                 for (i = 0 ; i < sk_X509_num(cas) ; i++) {
239                                         if (!X509_STORE_add_cert(
240                                                     cert->store,
241                                                     sk_X509_value(cas, i))) {
242                                                 action = "add certificate to "
243                                                         "X.509 store";
244                                                 sk_X509_pop_free(cas,
245                                                                  X509_free);
246                                                 goto fail;
247                                         }
248                                 }
249 
250                                 sk_X509_pop_free(cas, X509_free);
251                         }
252                         break;
253 
254                         case RD_KAFKA_CERT_ENC_DER:
255                         {
256                                 X509 *x509;
257 
258                                 action = "read DER / X.509 ASN.1";
259                                 if (!(x509 = d2i_X509_bio(bio, NULL)))
260                                         goto fail;
261 
262                                 if (!X509_STORE_add_cert(cert->store, x509)) {
263                                         action = "add certificate to "
264                                                 "X.509 store";
265                                         X509_free(x509);
266                                         goto fail;
267                                 }
268                         }
269                         break;
270 
271                         case RD_KAFKA_CERT_ENC_PEM:
272                         {
273                                 X509 *x509;
274                                 int cnt = 0;
275 
276                                 action = "read PEM";
277 
278                                 /* This will read one certificate per call
279                                  * until an error occurs or the end of the
280                                  * buffer is reached (which is an error
281                                  * we'll need to clear). */
282                                 while ((x509 =
283                                         PEM_read_bio_X509(
284                                                 bio, NULL,
285                                                 rd_kafka_conf_ssl_passwd_cb,
286                                                 (void *)conf))) {
287 
288                                         if (!X509_STORE_add_cert(cert->store,
289                                                                  x509)) {
290                                                 action = "add certificate to "
291                                                         "X.509 store";
292                                                 X509_free(x509);
293                                                 goto fail;
294                                         }
295 
296                                         cnt++;
297                                 }
298 
299                                 if (!BIO_eof(bio)) {
300                                         /* Encountered parse error before
301                                          * reaching end, propagate error and
302                                          * fail. */
303                                         goto fail;
304                                 }
305 
306                                 if (!cnt) {
307                                         action = "retrieve at least one "
308                                                 "CA cert from PEM";
309 
310                                         goto fail;
311                                 }
312 
313                                 /* Reached end, which is raised as an error,
314                                  * so clear it since it is not. */
315                                 ERR_clear_error();
316                         }
317                         break;
318 
319                         default:
320                                 RD_NOTREACHED();
321                                 break;
322                 }
323                 break;
324 
325 
326         case RD_KAFKA_CERT_PUBLIC_KEY:
327                 switch (encoding)
328                 {
329                 case RD_KAFKA_CERT_ENC_PKCS12:
330                 {
331                         EVP_PKEY *ign_pkey;
332 
333                         action = "parse PKCS#12";
334                         if (!PKCS12_parse(p12, conf->ssl.key_password,
335                                           &ign_pkey, &cert->x509, NULL))
336                                 goto fail;
337 
338                         EVP_PKEY_free(ign_pkey);
339 
340                         action = "retrieve public key";
341                         if (!cert->x509)
342                                 goto fail;
343                 }
344                 break;
345 
346                 case RD_KAFKA_CERT_ENC_DER:
347                         action = "read DER / X.509 ASN.1";
348                         cert->x509 = d2i_X509_bio(bio, NULL);
349                         if (!cert->x509)
350                                 goto fail;
351                         break;
352 
353                 case RD_KAFKA_CERT_ENC_PEM:
354                         action = "read PEM";
355                         cert->x509 = PEM_read_bio_X509(
356                                 bio, NULL, rd_kafka_conf_ssl_passwd_cb,
357                                 (void *)conf);
358                         if (!cert->x509)
359                                 goto fail;
360                         break;
361 
362                 default:
363                         RD_NOTREACHED();
364                         break;
365                 }
366                 break;
367 
368 
369         case RD_KAFKA_CERT_PRIVATE_KEY:
370                 switch (encoding)
371                 {
372                 case RD_KAFKA_CERT_ENC_PKCS12:
373                 {
374                         X509 *x509;
375 
376                         action = "parse PKCS#12";
377                         if (!PKCS12_parse(p12, conf->ssl.key_password,
378                                           &cert->pkey, &x509, NULL))
379                                 goto fail;
380 
381                         X509_free(x509);
382 
383                         action = "retrieve private key";
384                         if (!cert->pkey)
385                                 goto fail;
386                 }
387                 break;
388 
389                 case RD_KAFKA_CERT_ENC_DER:
390                         action = "read DER / X.509 ASN.1 and "
391                                 "convert to EVP_PKEY";
392                         cert->pkey = d2i_PrivateKey_bio(bio, NULL);
393                         if (!cert->pkey)
394                                 goto fail;
395                         break;
396 
397                 case RD_KAFKA_CERT_ENC_PEM:
398                         action = "read PEM";
399                         cert->pkey = PEM_read_bio_PrivateKey(
400                                 bio, NULL, rd_kafka_conf_ssl_passwd_cb,
401                                 (void *)conf);
402                         if (!cert->pkey)
403                                 goto fail;
404                         break;
405 
406                 default:
407                         RD_NOTREACHED();
408                         break;
409                 }
410                 break;
411 
412         default:
413                 RD_NOTREACHED();
414                 break;
415         }
416 
417         if (bio)
418                 BIO_free(bio);
419         if (p12)
420                 PKCS12_free(p12);
421 
422         return cert;
423 
424  fail:
425         rd_snprintf(errstr, errstr_size,
426                     "Failed to %s %s (encoding %s): %s",
427                     action,
428                     rd_kafka_cert_type_names[type],
429                     rd_kafka_cert_enc_names[encoding],
430                     rd_kafka_ssl_last_error_str());
431 
432         if (cert)
433                 rd_kafka_cert_destroy(cert);
434         if (bio)
435                 BIO_free(bio);
436         if (p12)
437                 PKCS12_free(p12);
438 
439         return NULL;
440 }
441 #endif /* WITH_SSL */
442 
443 
444 /**
445  * @name Public API
446  * @brief These public methods must be available regardless if
447  *        librdkafka was built with OpenSSL or not.
448  * @{
449  */
450 
451 rd_kafka_conf_res_t
rd_kafka_conf_set_ssl_cert(rd_kafka_conf_t * conf,rd_kafka_cert_type_t cert_type,rd_kafka_cert_enc_t cert_enc,const void * buffer,size_t size,char * errstr,size_t errstr_size)452 rd_kafka_conf_set_ssl_cert (rd_kafka_conf_t *conf,
453                             rd_kafka_cert_type_t cert_type,
454                             rd_kafka_cert_enc_t cert_enc,
455                             const void *buffer, size_t size,
456                             char *errstr, size_t errstr_size) {
457 #if !WITH_SSL
458         rd_snprintf(errstr, errstr_size,
459                     "librdkafka not built with OpenSSL support");
460         return RD_KAFKA_CONF_INVALID;
461 #else
462         rd_kafka_cert_t *cert;
463         rd_kafka_cert_t **cert_map[RD_KAFKA_CERT__CNT] = {
464                 [RD_KAFKA_CERT_PUBLIC_KEY]  = &conf->ssl.cert,
465                 [RD_KAFKA_CERT_PRIVATE_KEY] = &conf->ssl.key,
466                 [RD_KAFKA_CERT_CA]          = &conf->ssl.ca
467         };
468         rd_kafka_cert_t **certp;
469 
470         if ((int)cert_type < 0 || cert_type >= RD_KAFKA_CERT__CNT) {
471                 rd_snprintf(errstr, errstr_size,
472                             "Invalid certificate type %d", (int)cert_type);
473                 return RD_KAFKA_CONF_INVALID;
474         }
475 
476         /* Make sure OpenSSL is loaded */
477         rd_kafka_global_init();
478 
479         certp = cert_map[cert_type];
480 
481         if (!buffer) {
482                 /* Clear current value */
483                 if (*certp) {
484                         rd_kafka_cert_destroy(*certp);
485                         *certp = NULL;
486                 }
487                 return RD_KAFKA_CONF_OK;
488         }
489 
490         cert = rd_kafka_cert_new(conf, cert_type, cert_enc, buffer, size,
491                                  errstr, errstr_size);
492         if (!cert)
493                 return RD_KAFKA_CONF_INVALID;
494 
495         if (*certp)
496                 rd_kafka_cert_destroy(*certp);
497 
498         *certp = cert;
499 
500         return RD_KAFKA_CONF_OK;
501 #endif
502 }
503 
504 
505 
506 /**
507  * @brief Destructor called when configuration object is destroyed.
508  */
rd_kafka_conf_cert_dtor(int scope,void * pconf)509 void rd_kafka_conf_cert_dtor (int scope, void *pconf) {
510 #if WITH_SSL
511         rd_kafka_conf_t *conf = pconf;
512         assert(scope == _RK_GLOBAL);
513         if (conf->ssl.key) {
514                 rd_kafka_cert_destroy(conf->ssl.key);
515                 conf->ssl.key = NULL;
516         }
517         if (conf->ssl.cert) {
518                 rd_kafka_cert_destroy(conf->ssl.cert);
519                 conf->ssl.cert = NULL;
520         }
521         if (conf->ssl.ca) {
522                 rd_kafka_cert_destroy(conf->ssl.ca);
523                 conf->ssl.ca = NULL;
524         }
525 #endif
526 }
527 
528 /**
529  * @brief Copy-constructor called when configuration object \p psrcp is
530  *        duplicated to \p dstp.
531  */
rd_kafka_conf_cert_copy(int scope,void * pdst,const void * psrc,void * dstptr,const void * srcptr,size_t filter_cnt,const char ** filter)532 void rd_kafka_conf_cert_copy (int scope, void *pdst, const void *psrc,
533                               void *dstptr, const void *srcptr,
534                               size_t filter_cnt, const char **filter) {
535 #if WITH_SSL
536         rd_kafka_conf_t *dconf = pdst;
537         const rd_kafka_conf_t *sconf = psrc;
538 
539         assert(scope == _RK_GLOBAL);
540 
541         /* Free and reset any exist certs on the destination conf */
542         rd_kafka_conf_cert_dtor(scope, pdst);
543 
544         if (sconf->ssl.key)
545                 dconf->ssl.key = rd_kafka_cert_dup(sconf->ssl.key);
546 
547         if (sconf->ssl.cert)
548                 dconf->ssl.cert = rd_kafka_cert_dup(sconf->ssl.cert);
549 
550         if (sconf->ssl.ca)
551                 dconf->ssl.ca = rd_kafka_cert_dup(sconf->ssl.ca);
552 #endif
553 }
554 
555 
556 /**@}*/
557