1 /* zxbusprod.c  -  Liberty oriented logging facility with log signing and encryption
2  * Copyright (c) 2012-2013 Synergetics (sampo@synergetics.be), All Rights Reserved.
3  * Copyright (c) 2010-2011 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved.
4  * Copyright (c) 2006-2009 Symlabs (symlabs@symlabs.com), All Rights Reserved.
5  * Author: Sampo Kellomaki (sampo@iki.fi)
6  * This is confidential unpublished proprietary source code of the author.
7  * NO WARRANTY, not even implied warranties. Contains trade secrets.
8  * Distribution prohibited unless authorized in writing.
9  * Licensed under Apache License 2.0, see file COPYING.
10  * $Id$
11  *
12  * 17.8.2012,  creted, based on zxlog.c --Sampo
13  * 19.8.2012,  added tolerance for CRLF where strictly LF is meant --Sampo
14  * 6.9.2012,   added SSL support --Sampo
15  * 9.9.2012,   added persist support --Sampo
16  * 30.11.2013, fixed seconds handling re gmtime_r() - found by valgrind --Sampo
17  *
18  * Apart from formatting code, this is effectively a STOMP 1.1 client. Typically
19  * it will talk to zxbusd instances configured using BUS_URL options.
20  *
21  * See also:  http://stomp.github.com/stomp-specification-1.1.html (20110331)
22  * Todo: implement anti fragmentation option (tcp CORK (check Nagle algo) or
23  * bundle writes in this code).
24  */
25 
26 #include "platform.h"  /* needed on Win32 for pthread_mutex_lock() et al. */
27 
28 #include <fcntl.h>
29 #include <string.h>
30 #include <stdarg.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <errno.h>
34 #include <time.h>
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 #ifdef MINGW
38 # include <winsock.h>
39 # ifndef EINPROGRESS
40  #  define EINPROGRESS WSAEINPROGRESS   /* Missing in mingw 1.0, 3.0 defines this in errno.h*/
41  # endif
42  #else
43  # include <netdb.h>
44  # include <netinet/in.h>  /* struct sockaddr_in */
45  #endif
46 
47  #ifdef USE_OPENSSL
48  #include <openssl/x509.h>
49  #include <openssl/rsa.h>
50  #include <openssl/evp.h>
51  #include <openssl/aes.h>
52  #include <openssl/ssl.h>
53  #endif
54 
55  #include "errmac.h"
56  #include "zxid.h"
57  #include "zxidutil.h"  /* for zx_zlib_raw_deflate(), safe_basis_64, and name_from_path */
58  #include "zxidconf.h"
59  #include "c/zx-data.h" /* Generated. If missing, run `make dep ENA_GEN=1' */
60 
61  #define ZXBUS_BUF_SIZE 4096
62  /* Alias some struct fields for headers that can not be seen together. */
63  #define receipt   host
64  #define rcpt_id   host
65  #define acpt_vers vers
66  #define tx_id     vers
67  #define session   login
68  #define subs_id   login
69  #define subsc     login
70  #define server    pw
71  #define ack       pw
72  #define msg_id    pw
73  #define heart_bt  dest
74  #define zx_rcpt_sig dest
75 
76  #define STOMP_MIN_PDU_SIZE (sizeof("ACK\n\n\0\n")-1)
77  extern int zxbus_persist_flag; /* This is defined by option processing of zxbuslist */
78  int zxbus_verbose = 0;         /* This is set by option processing in zxbustailf */
79  int zxbus_ascii_color = 0;     /* Defined in option processing of zxbustailf or zxbuslist */
80 
81  #define SSL_ENCRYPTED_HINT "TLS or SSL connection wanted but other end did not speak protocol.\n"
82  #define ZXBUS_TIME_FMT "%04d%02d%02d-%02d%02d%02d.%03ld"
83  #define ZXBUS_TIME_ARG(t,usec) t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, \
84 				t.tm_hour, t.tm_min, t.tm_sec, usec/1000
85 
86  #if 0
87  /*() Allocate memory for logging purposes.
88   * Generally memory allocation goes via zx_alloc() family of functions. However
89   * dues to special requirements of cryptographically implemeted logging,
90   * we maintain this special allocation function (which backends to zx_alloc()).
91   * Among the special features: This function makes sure the buffer size is
92   * rounded up to multiple of nonce to accommodate block ciphers.
93   *
94   * This function is considered internal. Do not use unless you know what you are doing. */
95 
96  /* Called by:  zxbus_write_line x3 */
97  static char* zxbus_alloc_zbuf(zxid_conf* cf, int *zlen, char* zbuf, int len, char* sig, int nonce)
98  {
99    char* p;
100    int siz = nonce + 2 + len + *zlen;
101    ROUND_UP(siz, nonce);        /* Round up to block size */
102    p = ZX_ALLOC(cf->ctx, siz);
103    if (nonce)
104      zx_rand(p, nonce);
105    p[nonce] = (len >> 8) & 0xff;
106    p[nonce+1] = len & 0xff;
107    if (len) {
108      memcpy(p+nonce+2, sig, len);
109      ZX_FREE(cf->ctx, sig);
110    }
111    memcpy(p+nonce+2+len, zbuf, *zlen);
112    ZX_FREE(cf->ctx, zbuf);
113    *zlen += nonce + 2 + len;
114    return p;
115  }
116 
117  /*() Write a line to a log, taking care of all formalities of locking and
118  * observing all special options for signing and encryption of the logs.
119  * Not usually called directly (but you can if you want to), this is the
120  * work horse behind zxbus().
121  *
122  * cf::  ZXID configuration object, used for memory allocation.
123  * c_path:: Path to the log file, as C string
124  * encflags:: Encryption flags. See LOG_ERR or LOG_ACT configuration options in zxidconf.h
125  * n:: length of log data
126  * logbuf:: The data that should be logged
127  */
128 
129  /* Called by: */
130  void zxbus_write_line(zxid_conf* cf, char* c_path, int encflags, int n, const char* logbuf)
131  {
132    EVP_PKEY* log_sign_pkey;
133    struct rsa_st* rsa_pkey;
134    struct aes_key_st aes_key;
135    int len = 0, blen, zlen;
136    char sigletter = 'P';
137    char encletter = 'P';
138    char* p;
139    char* sig = 0;
140    char* zbuf;
141    char* b64;
142    char sigbuf[28+4];   /* Space for "SP " and sha1 */
143    char keybuf[16];
144    char ivec[16];
145    if (n == -2)
146      n = strlen(logbuf);
147    if (encflags & 0x70) {          /* Encrypt check */
148      zbuf = zx_zlib_raw_deflate(cf->ctx, n-1, logbuf, &zlen);
149      switch (encflags & 0x06) {     /* Sign check */
150      case 0x02:      /* Sx plain sha1 */
151        sigletter = 'S';
152        sig = ZX_ALLOC(cf->ctx, 20);
153        SHA1((unsigned char*)zbuf, zlen, (unsigned char*)sig);
154        len = 20;
155        break;
156      case 0x04:      /* Rx RSA-SHA1 signature */
157        sigletter = 'R';
158        LOCK(cf->mx, "logsign-wrln");
159        if (!(log_sign_pkey = cf->log_sign_pkey))
160 	 log_sign_pkey = cf->log_sign_pkey = zxid_read_private_key(cf, "logsign-nopw-cert.pem");
161        UNLOCK(cf->mx, "logsign-wrln");
162        if (!log_sign_pkey)
163 	 break;
164 len = zxsig_data(cf->ctx, zlen, zbuf, &sig, log_sign_pkey, "enc log line", 0);
165        break;
166      case 0x06:      /* Dx DSA-SHA1 signature */
167        ERR("DSA-SHA1 sig not implemented in encrypted mode. Use RSA-SHA1 or none. %x", encflags);
168        break;
169      case 0: break;  /* Px no signing */
170      }
171 
172      switch (encflags & 0x70) {
173      case 0x10:  /* xZ RFC1951 zip + safe base64 */
174        encletter = 'Z';
175        zbuf = zxbus_alloc_zbuf(cf, &zlen, zbuf, len, sig, 0);
176        break;
177      case 0x20:  /* xA RSA-AES */
178        encletter = 'A';
179        zbuf = zxbus_alloc_zbuf(cf, &zlen, zbuf, len, sig, 16);
180        zx_rand(keybuf, 16);
181        AES_set_encrypt_key((unsigned char*)keybuf, 128, &aes_key);
182        memcpy(ivec, zbuf, sizeof(ivec));
183        AES_cbc_encrypt((unsigned char*)zbuf+16, (unsigned char*)zbuf+16, zlen-16, &aes_key, (unsigned char*)ivec, 1);
184        ROUND_UP(zlen, 16);        /* Round up to block size */
185 
186        LOCK(cf->mx, "logenc-wrln");
187        if (!cf->log_enc_cert)
188 	 cf->log_enc_cert = zxid_read_cert(cf, "logenc-nopw-cert.pem");
189        rsa_pkey = zx_get_rsa_pub_from_cert(cf->log_enc_cert, "log_enc_cert");
190        UNLOCK(cf->mx, "logenc-wrln");
191        if (!rsa_pkey)
192 	 break;
193 
194        len = RSA_size(rsa_pkey);
195        sig = ZX_ALLOC(cf->ctx, len);
196        if (RSA_public_encrypt(16, (unsigned char*)keybuf, (unsigned char*)sig, rsa_pkey, RSA_PKCS1_OAEP_PADDING) < 0) {
197 	 ERR("RSA enc fail %x", encflags);
198 	 zx_report_openssl_err("zxbus rsa enc");
199 	 return;
200        }
201        p = ZX_ALLOC(cf->ctx, 2 + len + zlen);
202        p[0] = (len >> 8) & 0xff;
203        p[1] = len & 0xff;
204        memcpy(p+2, sig, len);
205        memcpy(p+2+len, zbuf, zlen);
206        ZX_FREE(cf->ctx, sig);
207        ZX_FREE(cf->ctx, zbuf);
208        zbuf = p;
209        zlen += 2 + len;
210        break;
211      case 0x30:  /* xT RSA-3DES */
212        encletter = 'T';
213        ERR("Enc not implemented %x", encflags);
214        break;
215      case 0x40:  /* xB AES */
216        encletter = 'B';
217        zbuf = zxbus_alloc_zbuf(cf, &zlen, zbuf, len, sig, 16);
218        if (!cf->log_symkey[0])
219 	 zx_get_symkey(cf, "logenc.key", cf->log_symkey);
220        AES_set_encrypt_key((unsigned char*)cf->log_symkey, 128, &aes_key);
221        memcpy(ivec, zbuf, sizeof(ivec));
222        AES_cbc_encrypt((unsigned char*)zbuf+16, (unsigned char*)zbuf+16, zlen-16, &aes_key, (unsigned char*)ivec, 1);
223        ROUND_UP(zlen, 16);        /* Round up to block size */
224        break;
225      case 0x50:  /* xU 3DES */
226        encletter = 'U';
227        ERR("Enc not implemented %x", encflags);
228        break;
229      default:
230        ERR("Enc not implemented %x", encflags);
231        break;
232      }
233 
234      blen = SIMPLE_BASE64_LEN(zlen) + 3 + 1;
235      b64 = ZX_ALLOC(cf->ctx, blen);
236      b64[0] = sigletter;
237      b64[1] = encletter;
238      b64[2] = ' ';
239      p = base64_fancy_raw(zbuf, zlen, b64+3, safe_basis_64, 1<<31, 0, 0, '.');
240      blen = p-b64 + 1;
241      *p = '\n';
242      write2_or_append_lock_c_path(c_path, 0, 0, blen, b64, "zxbus enc", SEEK_END, O_APPEND);
243      return;
244    }
245 
246    /* Plain text, possibly signed. */
247 
248    switch (encflags & 0x06) {
249    case 0x02:   /* SP plain sha1 */
250      strcpy(sigbuf, "SP ");
251      sha1_safe_base64(sigbuf+3, n-1, logbuf);
252      sigbuf[3+27] = ' ';
253      len = 3+27+1;
254      p = sigbuf;
255      break;
256    case 0x04:   /* RP RSA-SHA1 signature */
257      LOCK(cf->mx, "logsign-wrln");
258      if (!(log_sign_pkey = cf->log_sign_pkey))
259        log_sign_pkey = cf->log_sign_pkey = zxid_read_private_key(cf, "logsign-nopw-cert.pem");
260      UNLOCK(cf->mx, "logsign-wrln");
261      if (!log_sign_pkey)
262        break;
263 zlen = zxsig_data(cf->ctx, n-1, logbuf, &zbuf, log_sign_pkey, "log line", 0);
264      len = SIMPLE_BASE64_LEN(zlen) + 4;
265      sig = ZX_ALLOC(cf->ctx, len);
266      strcpy(sig, "RP ");
267      p = base64_fancy_raw(zbuf, zlen, sig+3, safe_basis_64, 1<<31, 0, 0, '.');
268      len = p-sig + 1;
269      *p = ' ';
270      p = sig;
271      break;
272    case 0x06:   /* DP DSA-SHA1 signature */
273      ERR("DSA-SHA1 signature not implemented %x", encflags);
274      break;
275    case 0:      /* Plain logging, no signing, no encryption. */
276      len = 5;
277      p = "PP - ";
278      break;
279    }
280    write2_or_append_lock_c_path(c_path, len, p, n, logbuf, "zxbus sig", SEEK_END, O_APPEND);
281    if (sig)
282      ZX_FREE(cf->ctx, sig);
283  }
284 
285  /*() Helper function for formatting all kinds of logs. */
286 
287  static int zxbus_fmt(zxid_conf* cf,   /* 1 */
288 		      int len, char* logbuf,
289 		      struct timeval* ourts,  /* 2 null allowed, will use current time */
290 		      struct timeval* srcts,  /* 3 null allowed, will use start of unix epoch... */
291 		      const char* ipport,     /* 4 null allowed, -:- or cf->ipport if not given */
292 		      struct zx_str* entid,   /* 5 null allowed, - if not given */
293 		      struct zx_str* msgid,   /* 6 null allowed, - if not given */
294 		      struct zx_str* a7nid,   /* 7 null allowed, - if not given */
295 		      struct zx_str* nid,     /* 8 null allowed, - if not given */
296 		      const char* sigval,     /* 9 null allowed, - if not given */
297 		      const char* res,        /* 10 */
298 		      const char* op,         /* 11 */
299 		      const char* arg,        /* 12 null allowed, - if not given */
300 		      const char* fmt,        /* 13 null allowed as format, ends the line */
301 		      va_list ap)
302  {
303    int n;
304    char* p;
305    char sha1_name[28];
306    struct tm ot;
307    struct tm st;
308    struct timeval ourtsdefault;
309    struct timeval srctsdefault;
310 
311    /* Prepare values */
312 
313    if (!ourts) {
314      ourts = &ourtsdefault;
315      GETTIMEOFDAY(ourts, 0);
316    }
317    if (!srcts) {
318      srcts = &srctsdefault;
319      srctsdefault.tv_sec = 0;
320      srctsdefault.tv_usec = 501000;
321    }
322    GMTIME_R(ourts->tv_sec, ot);
323    GMTIME_R(srcts->tv_sec, st);
324 
325    if (entid && entid->len && entid->s) {
326      sha1_safe_base64(sha1_name, entid->len, entid->s);
327      sha1_name[27] = 0;
328    } else {
329      sha1_name[0] = '-';
330      sha1_name[1] = 0;
331    }
332 
333    if (!ipport) {
334      ipport = cf->ipport;
335      if (!ipport)
336        ipport = "-:-";
337    }
338 
339    /* Format */
340 
341    n = snprintf(logbuf, len-3, ZXBUS_TIME_FMT " " ZXBUS_TIME_FMT
342 		" %s %s"  /* ipport  sha1_name-of-ent */
343 		" %.*s"
344 		" %.*s"
345 		" %.*s"
346 		" %s %s %s %s %s ",
347 		ZXBUS_TIME_ARG(ot, ourts->tv_usec), ZXBUS_TIME_ARG(st, srcts->tv_usec),
348 		ipport, sha1_name,
349 		msgid?msgid->len:1, msgid?msgid->s:"-",
350 		a7nid?a7nid->len:1, a7nid?a7nid->s:"-",
351 		nid?nid->len:1,     nid?nid->s:"-",
352 		errmac_instance, STRNULLCHKD(sigval), res, op, arg?arg:"-");
353    logbuf[len-1] = 0; /* must terminate manually as on win32 nul is not guaranteed */
354    if (n <= 0 || n >= len-3) {
355      if (n < 0)  platform_broken_snprintf(n, __FUNCTION__, len-3, "zxbus msg frame");
356      D("Log buffer too short: %d chars needed", n);
357      if (n <= 0)
358        n = 0;
359      else
360        n = len-3;
361    } else { /* Space left: try printing the format string as well! */
362      p = logbuf+n;
363      if (fmt && fmt[0]) {
364        n = vsnprintf(p, len-n-2, fmt, ap);
365        logbuf[len-1] = 0;  /* must terminate manually as on win32 nul term is not guaranteed */
366        if (n <= 0 || n >= len-(p-logbuf)-2) {
367 	 if (n < 0)  platform_broken_snprintf(n, __FUNCTION__, len-n-2, fmt);
368 	 D("Log buffer truncated during format print: %d chars needed", n);
369 	 if (n <= 0)
370 	   n = p-logbuf;
371 	 else
372 	   n = len-(p-logbuf)-2;
373        } else
374 	 n += p-logbuf;
375      } else {
376        logbuf[n++] = '-';
377      }
378    }
379    logbuf[n++] = '\n';
380    logbuf[n] = 0;
381    /*logbuf[len-1] = 0;*/
382    return n;
383  }
384  #endif
385 
386  /*() Clear current PDU from read buffer, moving the data after
387   * it (i.e. next PDU in buffer) in position to be read. */
388 
389  /* Called by:  zxbus_close x3, zxbus_listen_msg x4, zxbus_open_bus_url x2, zxbus_send_cmdf x3 */
zxbus_shift_read_buf(zxid_conf * cf,struct zxid_bus_url * bu,struct stomp_hdr * stomp)390  static void zxbus_shift_read_buf(zxid_conf* cf, struct zxid_bus_url* bu, struct stomp_hdr* stomp)
391  {
392    if (stomp->end_of_pdu) {
393      memmove(bu->m, stomp->end_of_pdu, bu->ap-stomp->end_of_pdu);
394      bu->ap = bu->m + (bu->ap-stomp->end_of_pdu);
395      D("shifted read_buf(%.*s)", (int)(bu->ap-bu->m), bu->m);
396    }
397    stomp->end_of_pdu = 0;
398  }
399 
400  /*() Read and parse a frame from STOMP 1.1 connection (from zxbusd).
401   * Blocks until frame has been read.
402   *
403   * Return:: 1 on success, 0 on failure.
404   *
405   * In case of failure, caller should close the connection. The PDU
406   * data is left in bu->m, possibly with the following pdu as well. The
407   * caller should clean the buffer without loosing the next pdu
408   * fragment before calling this function again. For example:
409   *   memmove(bu->m, stomp->end_of_pdu, bu->ap-stomp->end_of_pdu);
410   *   bu->ap = bu->m + (bu->ap-stomp->end_of_pdu);
411   *   stomp->end_of_pdu = 0;
412   * or by calling
413   *   zxbus_shift_read_buf(cf, bu, stomp);
414   *
415   * The parsed headers are returned in the struct stomp_hdr. */
416 
417  /* Called by:  zxbus_close, zxbus_listen_msg, zxbus_open_bus_url, zxbus_send_cmdf */
zxbus_read_stomp(zxid_conf * cf,struct zxid_bus_url * bu,struct stomp_hdr * stomp)418  int zxbus_read_stomp(zxid_conf* cf, struct zxid_bus_url* bu, struct stomp_hdr* stomp)
419  {
420    int need = 0, len = 0, got;
421    char* hdr;
422    char* h;
423    char* v;
424    char* p;
425 
426    memset(stomp, 0, sizeof(struct stomp_hdr));
427 
428    while (bu->ap - bu->m < ZXBUS_BUF_SIZE) {
429      D("read, already buf(%.*s) need=%d len=%d buf_avail=%d", (int)(bu->ap-bu->m), bu->m, need, (int)(bu->ap-bu->m), (int)(ZXBUS_BUF_SIZE-(bu->ap - bu->m)));
430      if (need || bu->ap == bu->m) {
431  #ifdef USE_OPENSSL
432        if (bu->ssl) {
433 	 got = SSL_read(bu->ssl, bu->ap, ZXBUS_BUF_SIZE - (bu->ap - bu->m));
434 	 if (got < 0) {
435 	   ERR("SSL_read(%x) bu_%p: (%d) %d %s", bu->fd, bu, got, errno, STRERROR(errno));
436 	   zx_report_openssl_err("zxbus_read-ssl");
437 	   return 0;
438 	 }
439        } else {
440 	 got = recv((SOCKET)bu->fd, bu->ap, ZXBUS_BUF_SIZE - (bu->ap - bu->m), 0);
441 	 if (got < 0) {
442 	   ERR("recv(%x) bu_%p: %d %s", bu->fd, bu, errno, STRERROR(errno));
443 	   return 0;
444 	 }
445        }
446  #else
447        got = recv((SOCKET)bu->fd, bu->ap, ZXBUS_BUF_SIZE - (bu->ap - bu->m), 0);
448        if (got < 0) {
449 	 ERR("recv: %d %s", errno, STRERROR(errno));
450 	 return 0;
451        }
452  #endif
453        if (!got) {
454 	 D("recv: returned empty, gotten=%ld", (long)(bu->ap - bu->m));
455 	 return 0;
456        }
457        HEXDUMP("read:", bu->ap, bu->ap+got, /*16*/ 256);
458        bu->ap += got;
459      }
460      for (p = bu->m; p < bu->ap && ONE_OF_2(*p, '\n', '\r'); ++p) ;
461      if (p > bu->m) {
462        /* Wipe out initial newlines */
463        memmove(bu->m, p, bu->ap - p);
464        bu->ap -= p - bu->m;
465        p = bu->m;
466      }
467      if (bu->ap - p < STOMP_MIN_PDU_SIZE)
468        goto read_more;
469 
470      /* Extract command (always in beginning of buf) */
471 
472      hdr = memchr(p, '\n', bu->ap - p);
473      if (!hdr || ++hdr == bu->ap)
474        goto read_more;
475      p = hdr;
476 
477      /* Decode headers
478       * 01234 5 6 7
479       * STOMP\n\n\0
480       *         ^-p
481       * 01234 5 6 7 8 9
482       * STOMP\r\n\r\n\0
483       *           ^-p
484       * STOMP\nhost:foo\n\n\0
485       *        ^-p        ^-pp
486       * STOMP\r\nhost:foo\r\n\r\n\0
487       *          ^-p          ^-pp
488       * STOMP\nhost:foo\naccept-version:1.1\n\n\0
489       *        ^-p       ^-pp                 ^-ppp
490       * STOMP\r\nhost:foo\r\naccept-version:1.1\r\n\r\n\0
491       *          ^-p         ^-pp                   ^-ppp
492       */
493 
494      while (!ONE_OF_2(*p,'\n','\r')) { /* Empty line separates headers from body. */
495        h = p;
496        p = memchr(p, '\n', bu->ap - p);
497        if (!p || ++p == bu->ap)
498 	 goto read_more;
499        v = memchr(h, ':', p-h);
500        if (!v) {
501 	 ERR("Header missing colon. hdr(%.*s)", (int)(bu->ap-h), h);
502 	 return 0;
503        }
504        ++v; /* skip : */
505 
506  #define HDR(hdr, field, val) } else if (!memcmp(h, hdr, sizeof(hdr)-1)) { if (!stomp->field) stomp->field = (val)
507 
508        if (!memcmp(h, "content-length:", sizeof("content-length:")-1)) {
509 	 if (!stomp->len) stomp->len = len = atoi(v); D("len=%d", stomp->len);
510        HDR("host:",           host,      v);
511        HDR("receipt:",        receipt,   v);
512        HDR("receipt-id:",     rcpt_id,   v);
513        HDR("zx-rcpt-sig:",    zx_rcpt_sig, v);
514        HDR("version:",        vers,      v);
515        HDR("accept-version:", acpt_vers, v);
516        HDR("transaction:",    tx_id,     v);
517        HDR("login:",          login,     v);
518        HDR("passcode:",       pw,        v);
519        HDR("session:",        session,   v);
520        HDR("id:",             subs_id,   v);
521        HDR("subscription:",   subsc,     v);
522        HDR("server:",         server,    v);
523        HDR("ack:",            ack,       v);
524        HDR("message-id:",     msg_id,    v);
525        HDR("destination:",    dest,      v);
526        HDR("heart-beat:",     heart_bt,  v);
527        } else if (!memcmp(h, "message:", sizeof("message:")-1)) { /* ignore */
528        } else if (!memcmp(h, "content-type:", sizeof("content-type:")-1)) { /* ignore */
529        } else {
530 	 D("Unknown header(%.*s) ignored.", (int)(p-h), h);
531        }
532      }
533 
534      /* Now body */
535 
536      if (*p == '\r') ++p;
537      stomp->body = ++p;
538 
539      if (len) {
540        if (len < bu->ap - p) {
541 	 /* Got complete with content-length */
542 	 p += len;
543 	 if (!*p++)
544 	   goto done;
545 	 ERR("No nul to terminate body. %d",0);
546 	 return 0;
547        } else {
548 	 goto read_more;
549        }
550      } else {
551        /* Scan until nul */
552        while (1) {
553 	 if (bu->ap - p < 1) {   /* too little, need more */
554 	   goto read_more;
555 	 }
556 	 if (!*p++) {
557 	   stomp->len = p - stomp->body - 1;
558 	   goto done;
559 	 }
560        }
561      }
562    read_more:
563      need = 1;
564      continue;
565    }
566    if (bu->ap - bu->m >= ZXBUS_BUF_SIZE) {
567      ERR("PDU does not fit in buffer %d", (int)(bu->ap-bu->m));
568      return 0;
569    }
570   done:
571    stomp->end_of_pdu = p;
572    return 1;
573  }
574 
575  /*() ACK a message to STOMP 1.1 connection.
576   * N.B. ACK is not a command. Thus no RECEIPT is expected from server
577   * end (ACK really is the receipt for MESSAGE sent by server).
578   *
579   * Returns:: zero on failure and 1 on success. */
580 
581  /* Called by:  zxbus_listen_msg */
zxbus_ack_msg(zxid_conf * cf,struct zxid_bus_url * bu,struct stomp_hdr * stompp)582  int zxbus_ack_msg(zxid_conf* cf, struct zxid_bus_url* bu, struct stomp_hdr* stompp)
583  {
584    int len;
585    char sigbuf[1024];
586    char buf[1024];
587    int subs_id_len, msg_id_len;
588    subs_id_len = strchr(stompp->subs_id, '\n') - stompp->subs_id;
589    msg_id_len = strchr(stompp->msg_id, '\n') - stompp->msg_id;
590 
591    zxbus_mint_receipt(cf, sizeof(sigbuf), sigbuf,
592 		      msg_id_len, stompp->msg_id,
593 		      -2, stompp->dest,
594 		      -1, bu->eid,  /* entity to which we issue this receipt */
595 		      stompp->len, stompp->body);
596    len = snprintf(buf, sizeof(buf), "ACK\nsubscription:%.*s\nmessage-id:%.*s\nzx-rcpt-sig:%s\n\n%c",
597 		  subs_id_len, stompp->subs_id, msg_id_len, stompp->msg_id, sigbuf, 0);
598    HEXDUMP(" ack:", buf, buf+len, /*16*/ 256);
599  #ifdef USE_OPENSSL
600    if (bu->ssl)
601      SSL_write(bu->ssl, buf, len);
602    else
603  #endif
604      send_all_socket(bu->fd, buf, len);
605    return 1;
606  }
607 
608  /*() NACK a message to STOMP 1.1 connection, signalling trouble persisting it.
609   * N.B. NACK is not a command. Thus no RECEIPT is expected from server
610   * end (NACK really is the receipt for MESSAGE sent by server).
611   *
612   * Returns:: zero on failure and 1 on success. */
613 
614  /* Called by:  zxbus_listen_msg x2 */
zxbus_nack_msg(zxid_conf * cf,struct zxid_bus_url * bu,struct stomp_hdr * stompp,const char * errmsg)615  int zxbus_nack_msg(zxid_conf* cf, struct zxid_bus_url* bu, struct stomp_hdr* stompp, const char* errmsg)
616  {
617    int len;
618    char buf[1024];
619    int subs_id_len, msg_id_len;
620    subs_id_len = strchr(stompp->subs_id, '\n') - stompp->subs_id;
621    msg_id_len = strchr(stompp->msg_id, '\n') - stompp->msg_id;
622 
623    len = snprintf(buf, sizeof(buf), "NACK\nsubscription:%.*s\nmessage-id:%.*s\nmessage:%s\n\n%c",
624 		  subs_id_len, stompp->subs_id, msg_id_len, stompp->msg_id, errmsg, 0);
625    HEXDUMP("nack:", buf, buf+len, /*16*/ 256);
626  #ifdef USE_OPENSSL
627    if (bu->ssl)
628      SSL_write(bu->ssl, buf, len);
629    else
630  #endif
631      send_all_socket(bu->fd, buf, len);
632    return 1;
633  }
634 
635  /*() Listen for a MESSAGE from the STOMP 1.1 connection and ACK it.
636   * Returns pointer to the body (which is nul terminated as the
637   * STOMP 1.1 frame ends in nul). Returns NULL on error.
638   * N.B. Depending on situation, you may NOT want automatic ACK.
639   * In that case you should call zxbus_read() and zxbus_ack_msg()
640   * directly and do your persistence in between.
641   *
642   * See also:: zxbus_persist() */
643 
644  /* Called by:  zxbuslist_main */
zxbus_listen_msg(zxid_conf * cf,struct zxid_bus_url * bu)645  char* zxbus_listen_msg(zxid_conf* cf, struct zxid_bus_url* bu)
646  {
647    struct stomp_hdr stomp;
648    int dest_len;
649    char* dest;
650    char c_path[ZXID_MAX_BUF];
651    if (zxbus_read_stomp(cf, bu, &stomp)) {
652      if (!memcmp(bu->m, "MESSAGE", sizeof("MESSAGE")-1)) {
653        if (zxbus_persist_flag) {
654 	 if (!(dest = stomp.dest)) {
655 	   ERR("SEND MUST specify destination header, i.e. channel to send to. %p", dest);
656 	   zxbus_nack_msg(cf, bu, &stomp, "no destination channel. server error.");
657 	   zxbus_shift_read_buf(cf, bu, &stomp);
658 	   return 0;
659 	 }
660 	 dest_len = (char*)memchr(dest, '\n', bu->ap - dest) - dest;  /* there will be \n in STOMP header */
661 	 DD("persist(%.*s)", dest_len, dest);
662 
663 	 if (!zxbus_persist_msg(cf, sizeof(c_path), c_path, dest_len, dest, bu->ap - bu->m,bu->m)) {
664 	   zxbus_nack_msg(cf, bu, &stomp, "difficulty in persisting (temporary client/local err)");
665 	   zxbus_shift_read_buf(cf, bu, &stomp);
666 	   return 0;
667 	 }
668        }
669        if (zxbus_verbose) {
670 	 if (zxbus_ascii_color>1) {
671 	   if (zxbus_verbose>1) {
672 	     fprintf(stdout, "\e[42m%.*s\e[0m\n", (int)(bu->ap - bu->m), bu->m);
673 	   } else {
674 	     fprintf(stdout, "\e[42m%.*s\e[0m\n", stomp.len, stomp.body);
675 	   }
676 	 } else {
677 	   if (zxbus_verbose>1) {
678 	     fprintf(stdout, "%.*s\n", (int)(bu->ap - bu->m), bu->m);
679 	   } else {
680 	     fprintf(stdout, "%.*s\n", stomp.len, stomp.body);
681 	   }
682 	 }
683        }
684        zxbus_ack_msg(cf, bu, &stomp);
685        zxbus_shift_read_buf(cf, bu, &stomp);
686        return stomp.body;  /* normal successful return */
687      } else {
688        ERR("Unknown command received(%.*s)", (int)(bu->ap - bu->m), bu->m);
689        zxbus_shift_read_buf(cf, bu, &stomp);
690        return 0;
691      }
692    } else {
693      ERR("Read from %s failed.", bu->s);
694      return 0;
695    }
696  }
697 
698  #ifdef USE_OPENSSL
699  //int zxbus_cert_verify_cb(X509_STORE_CTX* st_ctx, void* arg) {  zxid_conf* cf = arg;  return 0; }
700 
701  /* Called by: */
zx_ssl_info_cb(const SSL * ssl,int where,int ret)702  static void zx_ssl_info_cb(const SSL *ssl, int where, int ret)
703  {
704    const char *str;
705 
706    if ((where & ~SSL_ST_MASK) & SSL_ST_CONNECT) str="SSL_connect";
707    else if ((where & ~SSL_ST_MASK) & SSL_ST_ACCEPT) str="SSL_accept";
708    else str="undefined";
709 
710    if (where & SSL_CB_LOOP) {
711      D("%s:%s",str,SSL_state_string_long(ssl));
712    } else if (where & SSL_CB_ALERT) {
713      str=(where & SSL_CB_READ)?"read":"write";
714      D("SSL3 alert %s:%s:%s",str,SSL_alert_type_string_long(ret),SSL_alert_desc_string_long(ret));
715    } else if (where & SSL_CB_EXIT) {
716      if (ret == 0)
717        D("%s:failed in %s",str,SSL_state_string_long(ssl));
718      else if (ret < 0)
719        D("%s:error in %s",str,SSL_state_string_long(ssl));
720    }
721  }
722  #endif
723 
724  /*() Open a bus_url, i.e. STOMP 1.1 connection to zxbusd.
725   *
726   * return:: 0 on failure, 1 on success. */
727 
728  /* Called by:  zxbus_send_cmd, zxbuslist_main */
zxbus_open_bus_url(zxid_conf * cf,struct zxid_bus_url * bu)729  int zxbus_open_bus_url(zxid_conf* cf, struct zxid_bus_url* bu)
730  {
731  #ifdef USE_OPENSSL
732    X509* peer_cert;
733    zxid_entity* meta;
734  #endif
735    long vfy_err;
736    int len,tls;
737    char buf[1024];
738    struct hostent* he;
739    struct sockaddr_in sin;
740    struct stomp_hdr stomp;
741    int host_len;
742    char* proto;
743    char* host;
744    char* port;
745    char* local;
746    char* qs;
747    char* eid;
748    char* p;
749 
750    /* Parse the bus_url */
751 
752    if (!bu || !bu->s || !*bu->s) {
753      ERR("Null arguments or empty bus_url supplied %p", bu);
754      return 0;
755    }
756 
757    host = strstr(bu->s, "://");
758    if (!host) {
759      ERR("Malformed bus_url(%s): missing protocol field", bu->s);
760      proto = "stomps:";
761      host = bu->s;
762    } else {
763      proto = bu->s;
764      host += 3;
765    }
766 
767    if (!memcmp(proto, "stomps:", sizeof("stomps:")-1)) {
768      tls = 1;
769    } else if (!memcmp(proto, "stomp:", sizeof("stomp:")-1)) {
770      tls = 0;
771    } else {
772      ERR("Unknown protocol(%.*s)", 6, proto);
773      return 0;
774    }
775 
776    port = strchr(host, ':');
777    if (!port) {
778      port = tls ? ":2229/" : ":2228/";  /* ZXID default ports for stomps: and plain stomp: */
779      local = strchr(host, '/');
780      if (!local) {
781        qs = strchr(host, '?');
782        if (!qs) {
783 	 host_len = strlen(host);
784        } else {
785 	 host_len = qs-host;
786        }
787      } else {
788        host_len = local-host;
789        qs = strchr(local, '?');
790      }
791    } else {
792      host_len = port-host;
793      local = strchr(port, '/');
794      if (!local) {
795        qs = strchr(port, '?');
796      } else {
797        qs = strchr(local, '?');
798      }
799    }
800 
801    bu->m = bu->ap = ZX_ALLOC(cf->ctx, ZXBUS_BUF_SIZE);
802 
803    memcpy(bu->m, host, MIN(host_len, ZXBUS_BUF_SIZE-2));
804    bu->m[MIN(host_len, ZXBUS_BUF_SIZE-2)] = 0;
805    he = gethostbyname(bu->m);
806    if (!he) {
807      ERR("hostname(%s) did not resolve(%d) bu->s(%s) host_len=%d %d host(%.*s) %p port(%s) %p", bu->m, h_errno, bu->s, host_len, MIN(host_len, ZXBUS_BUF_SIZE-2), host_len, host, host, port, port);
808      exit(5);
809    }
810 
811    memset(&sin, 0, sizeof(sin));
812    sin.sin_family = AF_INET;
813    sin.sin_port = htons(atoi(port+1));
814    memcpy(&(sin.sin_addr.s_addr), he->h_addr, sizeof(sin.sin_addr.s_addr));
815 
816    if ((bu->fd = (fdtype)socket(AF_INET, SOCK_STREAM, 0)) == (fdtype)-1) {
817      ERR("Unable to create socket(AF_INET, SOCK_STREAM, 0) %d %s", errno, STRERROR(errno));
818      return 0;
819    }
820 
821  #if 0
822    nonblock(bu->fd);
823    if (nkbuf)
824      setkernelbufsizes(bu->fd, nkbuf, nkbuf);
825  #endif
826 
827    D("connecting(%x) hs(%s)", bu->fd, bu->s);
828    if ((connect((SOCKET)bu->fd, (struct sockaddr*)&sin, sizeof(sin)) == -1) && (errno != EINPROGRESS)) {
829      ERR("Connection to %s failed: %d %s", bu->s, errno, STRERROR(errno));
830      goto errout;
831    }
832 
833    D("connected(%x) at TCP layer hs(%s)", bu->fd, bu->s);
834 
835    if (tls) {
836  #ifdef USE_OPENSSL
837      if (!cf->ssl_ctx) {
838        SSL_load_error_strings();
839        SSL_library_init();
840  #if 0
841        cf->ssl_ctx = SSL_CTX_new(SSLv23_method());
842  #else
843        cf->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
844  #endif
845      }
846      if (!cf->ssl_ctx) {
847        ERR("TLS/SSL connection to(%s) can not be made. SSL context initialization problem", bu->s);
848        zx_report_openssl_err("open_bus_url-ssl_ctx");
849        goto errout;
850      } else {
851        if (errmac_debug>1) {
852 	D("OpenSSL header-version(%lx) lib-version(%lx)(%s) %s %s %s %s", OPENSSL_VERSION_NUMBER, SSLeay(), SSLeay_version(SSLEAY_VERSION), SSLeay_version(SSLEAY_CFLAGS), SSLeay_version(SSLEAY_BUILT_ON), SSLeay_version(SSLEAY_PLATFORM), SSLeay_version(SSLEAY_DIR));
853 	SSL_CTX_set_info_callback(cf->ssl_ctx, zx_ssl_info_cb);
854       }
855       SSL_CTX_set_mode(cf->ssl_ctx, SSL_MODE_AUTO_RETRY);  /* R/W only return when complete. */
856       /* Verification strategy: do not attempt verification at SSL layer. Instead
857        * check the result afterwards against metadata based cert. */
858       SSL_CTX_set_verify(cf->ssl_ctx, SSL_VERIFY_NONE,0);
859       //SSL_CTX_set_verify(cf->ssl_ctx, SSL_VERIFY_PEER,0);
860       //SSL_CTX_set_cert_verify_callback(cf->ssl_ctx, zxbus_cert_verify_cb, cf);
861       /*SSL_CTX_load_verify_locations() SSL_CTX_set_client_CA_list(3) SSL_CTX_set_cert_store(3) */
862       LOCK(cf->mx, "logenc wrln");
863       if (!cf->enc_cert)
864 	cf->enc_cert = zxid_read_cert(cf, "enc-nopw-cert.pem");
865       if (!cf->enc_pkey)
866 	cf->enc_pkey = zxid_read_private_key(cf, "enc-nopw-cert.pem");
867       UNLOCK(cf->mx, "logenc wrln");
868       if (!SSL_CTX_use_certificate(cf->ssl_ctx, cf->enc_cert)) {
869 	ERR("TLS/SSL connection to(%s) can not be made. SSL certificate problem", bu->s);
870 	zx_report_openssl_err("open_bus_url-cert");
871 	goto errout;
872       }
873       if (!SSL_CTX_use_PrivateKey(cf->ssl_ctx, cf->enc_pkey)) {
874 	ERR("TLS/SSL connection to(%s) can not be made. SSL private key problem", bu->s);
875 	zx_report_openssl_err("open_bus_url-privkey");
876 	goto errout;
877       }
878       if (!SSL_CTX_check_private_key(cf->ssl_ctx)) {
879 	ERR("TLS/SSL connection to(%s) can not be made. SSL certificate-private key consistency problem", bu->s);
880 	zx_report_openssl_err("open_bus_url-chk-privkey");
881 	goto errout;
882       }
883       /*SSL_CTX_add_extra_chain_cert(cf->ssl_ctx, ca_cert);*/
884     }
885     bu->ssl = SSL_new(cf->ssl_ctx);
886     if (!bu->ssl) {
887       ERR("TLS/SSL connection to(%s) can not be made. SSL object initialization problem", bu->s);
888       zx_report_openssl_err("open_bus_url-ssl");
889       goto errout;
890     }
891     if (!SSL_set_fd(bu->ssl, (int)bu->fd)) {
892       ERR("TLS/SSL connection to(%s) can not be made. SSL fd(%x) initialization problem", bu->s, bu->fd);
893       zx_report_openssl_err("open_bus_url-set_fd");
894       goto sslerrout;
895     }
896 
897     switch (vfy_err = SSL_get_error(bu->ssl, SSL_connect(bu->ssl))) {
898     case SSL_ERROR_NONE: break;
899       /*case SSL_ERROR_WANT_ACCEPT:  documented, but undeclared */
900     case SSL_ERROR_WANT_READ:
901     case SSL_ERROR_WANT_CONNECT:
902     case SSL_ERROR_WANT_WRITE:
903     default:
904       ERR("TLS/SSL connection to(%s) can not be made. SSL connect or handshake problem (%ld)", bu->s, vfy_err);
905       zx_report_openssl_err("open_bus_url-ssl_connect");
906       send((SOCKET)bu->fd, SSL_ENCRYPTED_HINT, sizeof(SSL_ENCRYPTED_HINT)-1, 0);
907       goto sslerrout;
908     }
909 
910     if (errmac_debug>1) D("SSL_version(%s) cipher(%s)",SSL_get_version(bu->ssl),SSL_get_cipher(bu->ssl));
911 
912     vfy_err = SSL_get_verify_result(bu->ssl);
913     switch (vfy_err) {
914     case X509_V_OK: break;
915     case X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT:
916       D("TLS/SSL connection to(%s) made, but certificate err. (%ld)", bu->s, vfy_err);
917       zx_report_openssl_err("open_bus_url-verify_res");
918       break;
919     default:
920       ERR("TLS/SSL connection to(%s) made, but certificate not acceptable. (%ld)", bu->s, vfy_err);
921       zx_report_openssl_err("open_bus_url-verify_res");
922       goto sslerrout;
923     }
924 
925     if (!(peer_cert = SSL_get_peer_certificate(bu->ssl))) {
926       ERR("TLS/SSL connection to(%s) made, but peer did not send certificate", bu->s);
927       zx_report_openssl_err("open_bus_url-peer_cert");
928       goto sslerrout;
929     }
930     meta = zxid_get_ent(cf, bu->eid);
931     if (!meta) {
932       ERR("Unable to find metadata for eid(%s) in verify peer cert", bu->eid);
933       goto sslerrout;
934     }
935     if (!meta->enc_cert) {
936       ERR("Metadata for eid(%s) does not contain enc cert", bu->eid);
937       goto sslerrout;
938     }
939     if (X509_cmp(meta->enc_cert, peer_cert)) {
940       ERR("Peer certificate does not match metadata for eid(%s)", bu->eid);
941       D("compare: %d", memcmp(meta->enc_cert->sha1_hash, peer_cert->sha1_hash, SHA_DIGEST_LENGTH));
942       PEM_write_X509(ERRMAC_DEBUG_LOG, peer_cert);
943       goto sslerrout;
944     }
945     /* *** should we free peer_cert? */
946     /*SSL_get_verify_result(bu->ssl); no need as SSL_VERIFY_PEER causes SSL_connect() to fail. */
947 #else
948     ERR("TLS/SSL connection to(%s) can not be made. SSL not compiled in", bu->s);
949     goto errout;
950 #endif
951   }
952 
953   eid = zxid_my_ent_id_cstr(cf);
954   if (!eid)
955     return 0;
956   for (p = eid; *p; ++p)
957     if (*p == ':')  /* deal with colon that is forbidden character in STOMP 1.1 header */
958       *p = '|';
959 
960   if (cf->bus_pw) {
961     len = snprintf(buf, sizeof(buf)-1, "STOMP\naccept-version:1.1\nhost:%s\nlogin:%s\npasscode:%s\n\n%c", bu->m, eid, cf->bus_pw, 0);
962   } else {
963     len = snprintf(buf, sizeof(buf)-1, "STOMP\naccept-version:1.1\nhost:%s\nlogin:%s\n\n%c", bu->m, eid, 0);
964   }
965   HEXDUMP("conn:", buf, buf+len, /*16*/ 256);
966 #ifdef USE_OPENSSL
967   if (bu->ssl)
968     SSL_write(bu->ssl, buf, len);
969   else
970 #endif
971     send_all_socket(bu->fd, buf, len);
972 
973   memset(&stomp, 0, sizeof(struct stomp_hdr));
974   if (zxbus_read_stomp(cf, bu, &stomp)) {
975     if (!memcmp(bu->m, "CONNECTED", sizeof("CONNECTED")-1)) {
976       zxbus_shift_read_buf(cf, bu, &stomp);
977       D("STOMP got CONNECTED bu-s(%s)", bu->s);
978       return 1;
979     }
980     zxbus_shift_read_buf(cf, bu, &stomp);
981   }
982   ERR("Connection to %s failed. Other end did not send CONNECTED", bu->s);
983 #ifdef USE_OPENSSL
984  sslerrout:
985   if (bu->ssl) {
986     SSL_shutdown(bu->ssl);
987     SSL_free(bu->ssl);
988     bu->ssl = 0;
989   }
990 #endif
991  errout:
992   closesocket((SOCKET)bu->fd);
993   bu->fd = 0;
994   return 0;
995 }
996 
997 /*() SEND a STOMP 1.1 DISCONNECT to audit bus and wait for RECEIPT.
998  *
999  * Returns:: zero on failure and 1 on success. Connection is closed in either case. */
1000 
1001 /* Called by:  zxbus_close_all */
zxbus_close(zxid_conf * cf,struct zxid_bus_url * bu)1002 int zxbus_close(zxid_conf* cf, struct zxid_bus_url* bu)
1003 {
1004   int len;
1005   char buf[1024];
1006   struct stomp_hdr stomp;
1007 
1008   D("closing(%x) bu_%p", bu->fd, bu);
1009 
1010   if (!bu || !bu->s || !bu->s[0] || !bu->fd)
1011     return 0;         /* No bus_url configured means audit bus reporting is disabled. */
1012 
1013   /* *** implement intelligent lbfo algo */
1014 
1015   D("disconnecting(%p) bu->s(%s)", bu, bu->s);
1016 
1017   len = snprintf(buf, sizeof(buf), "DISCONNECT\nreceipt:%d\n\n%c", bu->cur_rcpt-1, 0);
1018   send_all_socket(bu->fd, buf, len);
1019 
1020   memset(&stomp, 0, sizeof(struct stomp_hdr));
1021   if (zxbus_read_stomp(cf, bu, &stomp)) {
1022     if (!memcmp(bu->m, "RECEIPT", sizeof("RECEIPT")-1)) {
1023       if (atoi(stomp.rcpt_id) == bu->cur_rcpt - 1) {
1024 	zxbus_shift_read_buf(cf, bu, &stomp);
1025 	D("DISCONNECT got RECEIPT %d", bu->cur_rcpt-1);
1026 #ifdef USE_OPENSSL
1027 	if (bu->ssl) {
1028 	  SSL_shutdown(bu->ssl);
1029 	  SSL_free(bu->ssl);
1030 	  bu->ssl = 0;
1031 	}
1032 #endif
1033 	closesocket((SOCKET)bu->fd);
1034 	bu->fd = 0;
1035 	return 1;
1036       } else {
1037 	ERR("DISCONNECT to %s failed. RECEIPT number(%.*s)=%d mismatch cur_rcpt-1=%d", bu->s, (int)(bu->ap - stomp.rcpt_id), stomp.rcpt_id, atoi(stomp.rcpt_id), bu->cur_rcpt-1);
1038 	zxbus_shift_read_buf(cf, bu, &stomp);
1039 	goto errout;
1040       }
1041     } else {
1042       ERR("DISCONNECT to %s failed. Other end did not send RECEIPT(%.*s)", bu->s, (int)(bu->ap - bu->m), bu->m);
1043       zxbus_shift_read_buf(cf, bu, &stomp);
1044     }
1045   } else {
1046     ERR("DISCONNECT to %s failed. Other end did not send RECEIPT. Read error. Probably connection drop.", bu->s);
1047   }
1048  errout:
1049 #ifdef USE_OPENSSL
1050   if (bu->ssl) {
1051     SSL_shutdown(bu->ssl);
1052     SSL_free(bu->ssl);
1053     bu->ssl = 0;
1054   }
1055 #endif
1056   closesocket((SOCKET)bu->fd);
1057   bu->fd = 0;
1058   return 0;
1059 }
1060 
1061 /*() SEND a STOMP 1.1 DISCONNECT to audit bus and wait for RECEIPT.
1062  * Returns:: nothing. Ignores any errors (but errors cause fd to be closed). */
1063 
1064 /* Called by:  zxbuslist_main, zxbustailf_main */
zxbus_close_all(zxid_conf * cf)1065 void zxbus_close_all(zxid_conf* cf)
1066 {
1067   struct zxid_bus_url* bu;
1068   for (bu = cf->bus_url; bu; bu = bu->n)
1069     zxbus_close(cf, bu);
1070 }
1071 
1072 /*() Log successful receipt (the message should have been logged earlier separately)
1073  *
1074  * cf:: zxid configuration object
1075  * bu:: URL and eid of the destination audit bus node
1076  * mid:: message ID
1077  * dest:: Destination channel where message was sent
1078  * sha1_buf:: The sha1 over the message as was used to log the message in issue directory
1079  * rcpt_len:: Length of the receipt data returned by remote
1080  * rcpt:: Receipt data returned by remote
1081  *
1082  * Log format is as follows
1083  *   R1 YYYYMMDD-HHMMSS.sss URL SHA1-OF-EID MID CHANNEL SHA1-OF-MSG INST O K RCPT receipt_data
1084  * where receipt_data is like
1085  *   AB1 https://buslist.zxid.org/?o=B ACK RP 20120923-170431.868 76 3aSMhrZHtsviQnl3jnb8swYuxe_5uRnegGP0_i-hgPD6pzNkLtJdC7_qA7Ry-Iz1_cSDR7L91Oe9qgQZ64CzqC1qb0l5sSVoHNVQAzUWXgXOuHvXEgkMheAoLAUT8SKM_H9cUlPCrgCkVFWPXcLAR2FHAW7sNrGe7Mcm4MFFXqM.
1086  */
1087 
1088 /* Called by:  zxbus_send_cmdf */
zxbus_log_receipt(zxid_conf * cf,struct zxid_bus_url * bu,int mid_len,const char * mid,int dest_len,const char * dest,const char * sha1_buf,int rcpt_len,const char * rcpt)1089 static void zxbus_log_receipt(zxid_conf* cf, struct zxid_bus_url* bu, int mid_len, const char* mid, int dest_len, const char* dest, const char* sha1_buf, int rcpt_len, const char* rcpt)
1090 {
1091   int len;
1092   struct tm ot;
1093   struct timeval ourts;
1094   char sha1_name[28];
1095   char buf[1024];
1096   char c_path[ZXID_MAX_BUF];
1097 
1098   GETTIMEOFDAY(&ourts, 0);
1099   GMTIME_R(ourts.tv_sec, ot);
1100   sha1_safe_base64(sha1_name, -2, bu->eid);
1101   sha1_name[27] = 0;
1102 
1103   if (!mid)
1104     mid_len = 0;
1105   if (mid_len == -1)
1106     mid_len = strlen(mid);
1107   else if (mid_len == -2)
1108     mid_len = strchr(mid, '\n') - mid;
1109 
1110   if (!dest)
1111     dest_len = 0;
1112   if (dest_len == -1)
1113     dest_len = strlen(dest);
1114   else if (dest_len == -2)
1115     dest_len = strchr(dest, '\n') - dest;
1116 
1117   len = snprintf(buf, sizeof(buf)-1, "R1 " ZXLOG_TIME_FMT " "
1118 		 " %s %s"  /* url  sha1_name-of-ent */
1119 		 " %.*s %.*s %s"  /* mid, sha1 of the message (see zxlog_blob() call), dest */
1120 		 " %s %s %s %s"
1121 		 " %.*s\n",
1122 		 ZXLOG_TIME_ARG(ot, ourts.tv_usec),
1123 		 bu->s, sha1_name,
1124 		 mid_len, mid, dest_len, dest, sha1_buf,
1125 		 errmac_instance, "O", "K", "RCPT",
1126 		 rcpt_len, rcpt);
1127   buf[sizeof(buf)-1] = 0; /* must terminate manually as on win32 nul is not guaranteed */
1128   if (len < 0) platform_broken_snprintf(len, __FUNCTION__, sizeof(buf)-1, "zxbus receipt frame");
1129   name_from_path(c_path, sizeof(c_path), "%s" ZXID_LOG_DIR "rcpt", cf->cpath);
1130   write2_or_append_lock_c_path(c_path, len, buf, 0,0, "zxbus_send_cmdf",SEEK_END,O_APPEND);
1131 }
1132 
1133 /*() Send the specified STOMP 1.1 message to audit bus and wait for RECEIPT.
1134  * Blocks until the transaction completes (or fails). Figures out
1135  * from configuration, which bus daemon to contact (looks at bus_urls).
1136  * The fmt must contain command, headers, and double newline that
1137  * separates the body.
1138  * Will also log the message to /var/zxid/buscli/issue/SUCCINCT/wir/SHA1
1139  * and receipt to /var/zxid/buscli/log/rcpt
1140  *
1141  * return:: zero on failure and 1 on success. */
1142 
1143 /* Called by:  zxbus_send_cmd, zxbuslist_main */
zxbus_send_cmdf(zxid_conf * cf,struct zxid_bus_url * bu,int body_len,const char * body,const char * fmt,...)1144 int zxbus_send_cmdf(zxid_conf* cf, struct zxid_bus_url* bu, int body_len, const char* body, const char* fmt, ...)
1145 {
1146   va_list ap;
1147   int len, siglen, ver;
1148   char* eid;
1149   char* dest;
1150   char* rcpt;
1151   char buf[1024];
1152   char sha1_buf[28];
1153   struct zx_str sha1_ss;
1154   struct zx_str eid_ss;
1155   struct zx_str* logpath;
1156   struct stomp_hdr stomp;
1157 
1158   if (body_len == -1 && body)
1159     body_len = strlen(body);
1160 
1161   va_start(ap, fmt);
1162   len = vsnprintf(buf, sizeof(buf), fmt, ap);
1163   va_end(ap);
1164 
1165   rcpt = strstr(buf, "\nreceipt:");
1166   if (rcpt)
1167     rcpt += sizeof("\nreceipt:")-1;
1168   else
1169     rcpt = "\n";
1170 
1171   dest = strstr(buf, "\ndestination:");
1172   if (dest)
1173     dest += sizeof("\ndestination:")-1;
1174   else
1175     dest = "\n";
1176 
1177   if (cf->log_issue_msg) {
1178     /* Path will be composed of sha1 hash of the data in buf. */
1179     sha1_safe_base64(sha1_buf, len, buf);
1180     sha1_buf[27] = 0;
1181     sha1_ss.len = 27;
1182     sha1_ss.s = sha1_buf;
1183     eid_ss.len = strlen(bu->eid);
1184     eid_ss.s = bu->eid;
1185     logpath = zxlog_path(cf, &eid_ss, &sha1_ss, ZXLOG_ISSUE_DIR, ZXLOG_WIR_KIND, 1);
1186     if (logpath) {
1187       eid_ss.len = body_len;
1188       eid_ss.s = (char*)body;
1189       zxlog_blob(cf, cf->log_issue_msg, logpath, &eid_ss, "zxbus_send_cmdf");
1190       zx_str_free(cf->ctx, logpath);
1191     }
1192   }
1193 
1194   HEXDUMP(" buf:", buf, buf+len, /*16*/ 256);
1195   if (body) HEXDUMP("body:", body, body+body_len, /*16*/ 256);
1196 
1197 #ifdef USE_OPENSSL
1198   if (bu->ssl) {
1199     SSL_write(bu->ssl, buf, len);
1200     if (body)
1201       SSL_write(bu->ssl, body, body_len);
1202     SSL_write(bu->ssl, "\0", 1);
1203   } else {
1204     send_all_socket(bu->fd, buf, len);
1205     if (body)
1206       send_all_socket(bu->fd, body, body_len);
1207     send_all_socket(bu->fd, "\0", 1);
1208   }
1209 #else
1210   send_all_socket(bu->fd, buf, len);
1211   if (body)
1212     send_all_socket(bu->fd, body, body_len);
1213   send_all_socket(bu->fd, "\0", 1);
1214 #endif
1215 
1216   memset(&stomp, 0, sizeof(struct stomp_hdr));
1217   if (zxbus_read_stomp(cf, bu, &stomp)) {
1218     if (!memcmp(bu->m, "RECEIPT", sizeof("RECEIPT")-1)) {
1219       if (atoi(stomp.rcpt_id) == bu->cur_rcpt - 1) {
1220 	D("%.*s got RECEIPT %d", 4, buf, bu->cur_rcpt-1);
1221 
1222 	siglen = stomp.zx_rcpt_sig ? (strchr(stomp.zx_rcpt_sig, '\n') - stomp.zx_rcpt_sig) : 0;
1223 	eid = zxid_my_ent_id_cstr(cf);
1224 	ver = zxbus_verify_receipt(cf, bu->eid,
1225 				   siglen, siglen?stomp.zx_rcpt_sig:"",
1226 				   -2, rcpt,
1227 				   -2, dest,
1228 				   -1, eid,  /* our eid, the receipt was issued to us */
1229 				   body_len, body);
1230 	ZX_FREE(cf->ctx, eid);
1231 	if (ver != ZXSIG_OK) {
1232 	  ERR("RECEIPT signature validation failed: %d sig(%.*s) body(%.*s)", ver, siglen, siglen?stomp.zx_rcpt_sig:"", body_len, body);
1233 	  return 0;
1234 	}
1235 
1236 	if (zxbus_verbose) {
1237 	  fprintf(stdout, "%.*s(%.*s) got RECEIPT %d\n", 4, buf, body?body_len:0, body?body:"", bu->cur_rcpt-1);
1238 	}
1239 	if (cf->log_rely_msg) {   /* Log the receipt */
1240 	  zxbus_log_receipt(cf, bu, -2, rcpt, -2, dest, sha1_buf, siglen, siglen?stomp.zx_rcpt_sig:"");
1241 	}
1242 	zxbus_shift_read_buf(cf, bu, &stomp);
1243 	return 1;  /* normal successful return */
1244       } else {
1245 	ERR("Send to %s failed. RECEIPT number(%.*s)=%d mismatch cur_rcpt-1=%d (%s)", bu->s, (int)(bu->ap - stomp.rcpt_id), stomp.rcpt_id, atoi(stomp.rcpt_id), bu->cur_rcpt-1, bu->m);
1246 	zxbus_shift_read_buf(cf, bu, &stomp);
1247 	goto errout;
1248       }
1249     } else {
1250       ERR("Send to %s failed. Other end did not send RECEIPT(%.*s)", bu->s, (int)(bu->ap - bu->m), bu->m);
1251       zxbus_shift_read_buf(cf, bu, &stomp);
1252     }
1253   } else {
1254     ERR("Send to %s failed. Other end did not send RECEIPT. Read error.", bu->s);
1255   }
1256  errout:
1257 #ifdef USE_OPENSSL
1258   if (bu->ssl) {
1259     SSL_shutdown(bu->ssl);
1260     SSL_free(bu->ssl);
1261     bu->ssl = 0;
1262   }
1263 #endif
1264   closesocket((SOCKET)bu->fd);
1265   bu->fd = 0;
1266   return 0;
1267 }
1268 
1269 /*() Send the specified STOMP 1.1 message to audit bus and wait for RECEIPT.
1270  * Blocks until the transaction completes (or fails). Figures out
1271  * from configuration, which bus daemon to contact (looks at bus_urls).
1272  *
1273  * Returns:: zero on failure and 1 on success. */
1274 
1275 /* Called by:  zxbus_send, zxbustailf_main */
zxbus_send_cmd(zxid_conf * cf,const char * cmd,const char * dest,int n,const char * logbuf)1276 int zxbus_send_cmd(zxid_conf* cf, const char* cmd, const char* dest, int n, const char* logbuf)
1277 {
1278   struct zxid_bus_url* bu;
1279   bu = cf->bus_url;
1280   if (!bu || !bu->s || !bu->s[0])
1281     return 0;         /* No bus_url configured means audit bus reporting is disabled. */
1282 
1283   /* *** implement intelligent lbfo algo */
1284 
1285   if (!bu->fd)
1286     zxbus_open_bus_url(cf, bu);
1287   if (!bu->fd)
1288     return 0;
1289   return zxbus_send_cmdf(cf, bu, n, logbuf, "%s\ndestination:%s\nreceipt:%d\ncontent-length:%d\n\n", cmd, dest, bu->cur_rcpt++, n);
1290 }
1291 
1292 /*() SEND a STOMP 1.1 message to audit bus and wait for RECEIPT.
1293  * Blocks until the transaction completes (or fails). Figures out
1294  * from configuration, which bus daemon to contact (looks at bus_urls).
1295  *
1296  * Returns:: zero on failure and 1 on success. */
1297 
1298 /* Called by:  zxbustailf_main x2 */
zxbus_send(zxid_conf * cf,const char * dest,int n,const char * logbuf)1299 int zxbus_send(zxid_conf* cf, const char* dest, int n, const char* logbuf)
1300 {
1301   return zxbus_send_cmd(cf, "SEND", dest, n, logbuf);
1302 }
1303 
1304 #if 0
1305 /*(i) Log to activity and/or error log depending on ~res~ and configuration settings.
1306  * This is the main audit logging function you should call. Please see <<link:../../html/zxid-log.html: zxid-log.pd>>
1307  * for detailed description of the log format and features. See <<link:../../html/zxid-conf.html: zxid-conf.pd>> for
1308  * configuration options governing the logging. (*** check the links)
1309  *
1310  * Proper audit trail is essential for any high value transactions based on SSO. Also
1311  * some SAML protocol Processing Rules, such as duplicate detection, depend on the
1312  * logging.
1313  *
1314  * cf     (1)::  ZXID configuration object, used for configuration options and memory allocation
1315  * ourts  (2)::  Timestamp as observed by localhost. Typically the wall clock
1316  *     time. See gettimeofday(3)
1317  * srcts  (3)::  Timestamp claimed by the message to which the log entry pertains
1318  * ipport (4)::  IP address and port number from which the message appears to have originated
1319  * entid  (5)::  Entity ID to which the message pertains, usually the issuer. Null ok.
1320  * msgid  (6)::  Message ID, can be used for correlation to establish audit trail continuity
1321  *     from request to response. Null ok.
1322  * a7nid  (7)::  Assertion ID, if message contained assertion (outermost and first
1323  *     assertion if there are multiple relevant assertions). Null ok.
1324  * nid    (8)::  Name ID pertaining to the message
1325  * sigval (9)::  Signature validation letters
1326  * res   (10)::  Result letters
1327  * op    (11)::  Operation code for the message
1328  * arg   (12)::  Operation specific argument
1329  * fmt, ...  ::  Free format message conveying additional information
1330  * return:: 0 on success, nonzero on failure (often ignored as zxbus() is very
1331  *     robust and rarely fails - and when it does, situation is so hopeless that
1332  *     you would not be able to report its failure anyway)
1333  */
1334 
1335 /* Called by:  zxid_an_page_cf, zxid_anoint_sso_a7n, zxid_anoint_sso_resp, zxid_chk_sig, zxid_decode_redir_or_post x2, zxid_fed_mgmt_cf, zxid_get_ent_by_sha1_name, zxid_get_ent_ss, zxid_get_meta x2, zxid_idp_dispatch, zxid_idp_select_zxstr_cf_cgi, zxid_idp_soap_dispatch x2, zxid_idp_soap_parse, zxid_parse_conf_raw, zxid_parse_meta, zxid_saml_ok x2, zxid_simple_render_ses, zxid_simple_ses_active_cf, zxid_sp_anon_finalize, zxid_sp_deref_art x5, zxid_sp_dig_sso_a7n x2, zxid_sp_dispatch, zxid_sp_meta, zxid_sp_mni_redir, zxid_sp_mni_soap, zxid_sp_slo_redir, zxid_sp_slo_soap, zxid_sp_soap_dispatch x2, zxid_sp_soap_parse, zxid_sp_sso_finalize x2, zxid_start_sso_url x3 */
1336 int zxbus(zxid_conf* cf,   /* 1 */
1337 	  struct timeval* ourts,  /* 2 null allowed, will use current time */
1338 	  struct timeval* srcts,  /* 3 null allowed, will use start of unix epoch + 501 usec */
1339 	  const char* ipport,     /* 4 null allowed, -:- or cf->ipport if not given */
1340 	  struct zx_str* entid,   /* 5 null allowed, - if not given */
1341 	  struct zx_str* msgid,   /* 6 null allowed, - if not given */
1342 	  struct zx_str* a7nid,   /* 7 null allowed, - if not given */
1343 	  struct zx_str* nid,     /* 8 null allowed, - if not given */
1344 	  const char* sigval,     /* 9 null allowed, - if not given */
1345 	  const char* res,        /* 10 */
1346 	  const char* op,         /* 11 */
1347 	  const char* arg,        /* 12 null allowed, - if not given */
1348 	  const char* fmt, ...)   /* 13 null allowed as format, ends the line w/o further ado */
1349 {
1350   int n;
1351   char logbuf[1024];
1352   va_list ap;
1353 
1354   /* Avoid computation if logging is hopeless. */
1355 
1356   if (!((cf->log_err_in_act || res[0] == 'K') && cf->log_act)
1357       && !(cf->log_err && res[0] != 'K')) {
1358     return 0;
1359   }
1360 
1361   va_start(ap, fmt);
1362   n = zxbus_fmt(cf, sizeof(logbuf), logbuf,
1363 		ourts, srcts, ipport, entid, msgid, a7nid, nid, sigval, res,
1364 		op, arg, fmt, ap);
1365   va_end(ap);
1366   return zxbus_output(cf, n, logbuf, res);
1367 }
1368 #endif
1369 
1370 /* EOF  --  zxbusprod.c */
1371