1 /* stomp.c  -  STOMP 1.1 protocol for HIIOS engine
2  * Copyright (c) 2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved.
3  * This is confidential unpublished proprietary source code of the author.
4  * NO WARRANTY, not even implied warranties. Contains trade secrets.
5  * Distribution prohibited unless authorized in writing. See file COPYING.
6  * Special grant: http.c may be used with zxid open source project under
7  * same licensing terms as zxid itself.
8  * $Id$
9  *
10  * 16.8.2012, created, based on http and smtp --Sampo
11  * 19.8.2012, added tolerance for CRLF where strictly LF is meant --Sampo
12  *
13  * STOMP 1.1 frame is considered a PDU and consists of
14  *
15  *   \n              -- zero, or more in case of heart beats
16  *   COMMAND\n       -- lines end in LF (not CRLF)
17  *   header:value\n  -- zero or more
18  *   \n              -- blank line separates headers and body
19  *   payload
20  *   \0
21  *   \n              -- zero, or more in case of heart beats
22  *
23  * See also:  http://stomp.github.com/stomp-specification-1.1.html (20110331)
24  * Todo: implement heart beat generation and checking
25  */
26 
27 #include "platform.h"
28 #include "errmac.h"
29 #include "akbox.h"
30 #include "hiios.h"
31 #include "hiproto.h"
32 #include <zx/zxidconf.h>
33 #include <zx/zxidutil.h>
34 
35 #include <ctype.h>
36 #include <memory.h>
37 #include <stdlib.h>
38 #include <netinet/in.h> /* htons(3) and friends */
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <fcntl.h>
42 #include <errno.h>
43 
44 /* Alias some struct fields for headers that can not be seen together. */
45 #define receipt   host
46 #define rcpt_id   host
47 #define acpt_vers vers
48 #define tx_id     vers
49 #define session   login
50 #define subs_id   login
51 #define subsc     login
52 #define server    pw
53 #define ack       pw
54 #define msg_id    pw
55 #define heart_bt  dest
56 #define zx_rcpt_sig dest
57 #define STOMP_MIN_PDU_SIZE (sizeof("ACK\n\n\0")-1)
58 
59 extern int verbose;  /* defined in option parsing in zxbusd.c */
60 extern zxid_conf* zxbus_cf;
61 
62 #if 0
63 /* Called by: */
64 static struct hi_pdu* stomp_encode_start(struct hi_thr* hit)
65 {
66   struct hi_pdu* resp = hi_pdu_alloc(hit,"stomp_enc_start");
67   if (!resp) { hi_dump(hit->shf); NEVERNEVER("*** out of pdus in bad place %d", 0); }
68   return resp;
69 }
70 #endif
71 
72 /*() Send ERROR to remote client. */
73 
74 /* Called by:  stomp_frame_err, stomp_got_login x2, zxbus_persist x2 */
75 int stomp_err(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req, const char* ecode, const char* emsg)
76 {
77   int len;
78   char* rcpt;
79   if ((rcpt = req->ad.stomp.receipt)) {
80     len = memchr(rcpt, '\n', req->ap - rcpt) - (void*)rcpt;
81   } else {
82     len = 1;
83     rcpt = "-";
84   }
85   ERR("%s", emsg);
86   hi_sendf(hit, io, 0, req, "ERROR\nmessage:%s\nreceipt-id:%.*s\ncontent-type:text/plain\ncontent-length:%d\n\n%s%c", ecode, len, rcpt, strlen(emsg), emsg, 0);
87   return HI_CONN_CLOSE;
88 }
89 
90 /*() Send an error early on in decode process */
91 
92 /* Called by:  stomp_decode x2 */
93 static int stomp_frame_err(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req, const char* emsg)
94 {
95   /* At this early stage the req is still a io->cur_pdu. We need to
96    * promote it to a real request so that the free logic will work right. */
97   hi_add_to_reqs(hit, io, req, STOMP_MIN_PDU_SIZE);
98   return stomp_err(hit,io,req,"malformed frame received",emsg);
99 }
100 
101 #define CMD_NI_MSG "Command(%.*s) not implemented by server."
102 
103 /*() Send not implemented ERROR to remote client. */
104 
105 /* Called by:  stomp_decode x7, stomp_got_unsubsc */
106 static int stomp_cmd_ni(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req, const char* cmd)
107 {
108   const char* nl = strchr(cmd, '\n');
109   int len;
110   char* rcpt;
111   if ((rcpt = req->ad.stomp.receipt)) {
112     len = memchr(rcpt, '\n', req->ap - rcpt) - (void*)rcpt;
113   } else {
114     len = 1;
115     rcpt = "-";
116   }
117   ERR(CMD_NI_MSG, (int)(nl-cmd), cmd);
118   hi_sendf(hit, io, 0, req, "ERROR\nmessage:command not implemented by server\nreceipt-id:%.*s\ncontent-type:text/plain\ncontent-length:%d\n\n" CMD_NI_MSG "%c", len, rcpt, sizeof(CMD_NI_MSG)-3+(nl-cmd), nl-cmd, cmd, 0);
119   return HI_CONN_CLOSE;
120 }
121 
122 /*() Got ERROR from remote client. */
123 
124 /* Called by:  stomp_decode */
125 static int stomp_got_err(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req)
126 {
127   /*struct hi_pdu* resp = stomp_encode_start(hit);*/
128   /*hi_sendv(hit, io, 0, req, resp, len, resp->m, size, req->m + len);*/
129   ERR("remote sent error(%.*s)", (int)(req->ap-req->m), req->m);
130   return HI_CONN_CLOSE;
131 }
132 
133 /*() Send a receipt to client. */
134 
135 /* Called by:  stomp_got_disc, stomp_got_send, stomp_got_zxctl, zxbus_subscribe */
136 void stomp_send_receipt(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req)
137 {
138   int len;
139   char* rcpt;
140   char sigbuf[1024];
141   if ((rcpt = req->ad.stomp.receipt)) {
142     len = (char*)memchr(rcpt, '\n', req->ap - rcpt) - rcpt;
143   } else {
144     len = 1;
145     rcpt = "-";
146   }
147   DD("rcpt(%.*s) len=%d", len, rcpt, len);
148 
149   zxbus_mint_receipt(zxbus_cf, sizeof(sigbuf), sigbuf,
150 		     len, rcpt,
151 		     -2, req->ad.stomp.dest,
152 		     -1, io->ent->eid,   /* entity to which we issue receipt */
153 		     req->ad.stomp.len, req->ad.stomp.body);
154   hi_sendf(hit, io, 0, req, "RECEIPT\nreceipt-id:%.*s\nzx-rcpt-sig:%s\n\n%c", len, rcpt, sigbuf,0);
155 }
156 
157 /* STOMP Received Command Handling */
158 
159 /* Called by:  stomp_decode */
160 static int stomp_got_login(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req)
161 {
162   if (!req->ad.stomp.login)
163     return stomp_err(hit, io, req, "login fail", "No login header supplied (client error). zxbusd(8) requires login header whose value is the EntityID of the connecting client.");
164 
165   if (zxbus_login_ent(hit, io, req)) {
166     hi_sendf(hit, io, 0, req, "CONNECTED\nversion:1.1\nserver:zxbusd-1.x\n\n%c", 0);
167     return 0;
168   } else
169     return stomp_err(hit, io, req, "login fail", "login failed either due to nonexistent entity id or bad credential");
170 }
171 
172 /* Called by:  stomp_decode */
173 static int stomp_got_disc(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req)
174 {
175   /* *** figure out the real receipt id */
176   stomp_send_receipt(hit, io, req);
177   return HI_CONN_CLOSE;
178 }
179 
180 /*() Main function for receiving audit bus summary log lines from SPs.
181  * This function will first store the line in a persistent way (RAID1
182  * arrangement needs to be implemented at the OS level), and then,
183  * perhaps, attempt to deliver the message to all subscribers. Indeed,
184  * delivery should be attempted first and if successful, the persistence
185  * is not necessary. If delivery is unsuccessful, the delivery needs
186  * to be retried, i.e. this is a store-and-forward system. However,
187  * the delivery first approach needs extensive IO engine
188  * operations and it thus may be easier to just store the message
189  * first and then have a separate process attempt the sending. This
190  * latter is the approach adopted here. */
191 
192 /* Called by:  stomp_decode */
193 static void stomp_got_send(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req)
194 {
195   if (zxbus_persist(hit, io, req)) {
196     stomp_send_receipt(hit, io, req);
197   } else {
198     ERR("Persist Problem. Disk full? %d", 0);
199     //hi_sendf(hit, io, 0, req, "ERROR\nmessage:persist failure\nreceipt-id:%.*s\n\nUnable to persist message. Can not guarantee reliable delivery, therefore rejecting.%c", len, rcpt, 0);
200   }
201 }
202 
203 /*() Find a request that matches response. Looks in
204  * the io->pending list for message ID match. When found,
205  * the req is dequeued from pending. */
206 
207 /* Called by:  stomp_got_ack, stomp_got_nack */
208 static struct hi_pdu* stomp_find_pending_req_for_resp(struct hi_io* io, struct hi_pdu* resp)
209 {
210   struct hi_pdu* prev;
211   struct hi_pdu* req;
212   int midlen=resp->ad.stomp.msg_id?(strchr(resp->ad.stomp.msg_id,'\n')- resp->ad.stomp.msg_id):0;
213 
214   LOCK(io->qel.mut, "ack");
215   for (prev = 0, req = io->pending; req; prev = req, req = req->n) {
216     if (!memcmp(resp->ad.stomp.msg_id, req->ad.stomp.msg_id, midlen+1)) {
217       if (prev)
218 	prev->n = req->n;
219       else
220 	io->pending = req->n;
221       resp->req = req;
222       resp->parent = req->parent;
223       break;
224     }
225   }
226   UNLOCK(io->qel.mut, "ack");
227   return req;
228 }
229 
230 /*() Process NACK response from client to MESSAGE request sent by server.
231  * This is essentially nice way for the client to communicate to us it has
232  * difficulty in persisting the message. It could also just hang up and the
233  * net effect would be the same. However, receiving the NACK allows us to
234  * close the delivery bitch sooner so we can free the memory in the
235  * hopeless cases quicker. (*** there should also be a handler for
236  * close-connection lost that would do similar cleanup) */
237 
238 /* Called by:  stomp_decode */
239 static void stomp_got_nack(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* resp)
240 {
241   int sublen, midlen, siglen;
242   struct hi_pdu* parent;
243 
244   sublen = resp->ad.stomp.subsc ? (strchr(resp->ad.stomp.subsc, '\n') - resp->ad.stomp.subsc) : 0;
245   midlen = resp->ad.stomp.msg_id ?(strchr(resp->ad.stomp.msg_id, '\n')- resp->ad.stomp.msg_id) : 0;
246   siglen = resp->ad.stomp.zx_rcpt_sig ? (strchr(resp->ad.stomp.zx_rcpt_sig, '\n') - resp->ad.stomp.zx_rcpt_sig) : 0;
247 
248   D("NACK subsc(%.*s) msg_id(%.*s) zx_rcpt_sig(%.*s)", sublen, sublen?resp->ad.stomp.subsc:"", midlen, midlen?resp->ad.stomp.msg_id:"", siglen, siglen?resp->ad.stomp.zx_rcpt_sig:"");
249 
250   ASSERTOPP(resp->req, ==, 0);
251   if (!stomp_find_pending_req_for_resp(io, resp)) {
252     ERR("Unsolicited NACK subsc(%.*s) msg_id(%.*s)", sublen, sublen?resp->ad.stomp.subsc:"", midlen, midlen?resp->ad.stomp.msg_id:"");
253     return;
254   }
255   parent = resp->parent;
256   ASSERT(parent);
257 
258   /* *** add validation of zx_rcpt_sig. Lookup the cert using metadata for the EID. */
259   /* Remember NACK somewhere? */
260   hi_free_resp(hit, resp, "nack ");
261 
262   ++(parent->ad.delivb.nacks);
263   if (--(parent->ad.delivb.acks) <= 0) {
264     ASSERTOPI(parent->ad.delivb.acks, ==, 0);
265     close_file(parent->ad.delivb.ack_fd, "got_nack");
266     D("nack: freeing parent(%p)", parent);
267     hi_free_req(hit, parent, "nack ");
268   }
269 }
270 
271 /*() Process ACK response from client to MESSAGE request sent by server. */
272 
273 /* Called by:  stomp_decode */
274 static void stomp_got_ack(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* resp)
275 {
276   int sublen, midlen, siglen, ver;
277   struct hi_pdu* parent;
278   char* eid;
279   char buf[1024];
280 
281   /* First, it was wrong for hi_add_to_reqs() to be called for an ACK as acks are
282    * really responses to MESSAGEs. Thus undo that action here. */
283 
284   hi_del_from_reqs(io, resp);
285 
286   sublen = resp->ad.stomp.subsc ? (strchr(resp->ad.stomp.subsc, '\n') - resp->ad.stomp.subsc) : 0;
287   midlen = resp->ad.stomp.msg_id ?(strchr(resp->ad.stomp.msg_id, '\n')- resp->ad.stomp.msg_id) : 0;
288   siglen = resp->ad.stomp.zx_rcpt_sig ? (strchr(resp->ad.stomp.zx_rcpt_sig, '\n') - resp->ad.stomp.zx_rcpt_sig) : 0;
289 
290   DD("ACK subsc(%.*s) msg_id(%.*s) zx_rcpt_sig(%.*s)", sublen, sublen?resp->ad.stomp.subsc:"", midlen, midlen?resp->ad.stomp.msg_id:"", siglen, siglen?resp->ad.stomp.zx_rcpt_sig:"");
291 
292   if (!stomp_find_pending_req_for_resp(io, resp)) {
293     ERR("Unsolicited ACK subsc(%.*s) msg_id(%.*s)", sublen, sublen?resp->ad.stomp.subsc:"", midlen, midlen?resp->ad.stomp.msg_id:"");
294     return;
295   }
296   parent = resp->parent;
297   ASSERT(parent);
298 
299   if (errmac_debug>1)
300     D("ACK par_%p->len=%d rq_%p->len=%d\nparent->body(%.*s)\n   req->body(%.*s)", parent, parent->ad.delivb.len, resp->req, resp->req->ad.stomp.len, parent->ad.delivb.len, parent->ad.delivb.body, resp->req->ad.stomp.len, resp->req->ad.stomp.body);
301   else
302     D("ACK par_%p->len=%d rq_%p->len=%d", parent, parent->ad.delivb.len, resp->req, resp->req->ad.stomp.len);
303 
304   eid = zxid_my_ent_id_cstr(zxbus_cf);
305   ver = zxbus_verify_receipt(zxbus_cf, io->ent->eid,
306 			     siglen, siglen?resp->ad.stomp.zx_rcpt_sig:"",
307 			     -2, resp->req->ad.stomp.msg_id,
308 			     -2, resp->req->ad.stomp.dest,
309 			     -1, eid,  /* our eid, the receipt was issued to us */
310 			     resp->req->ad.stomp.len, resp->req->ad.stomp.body);
311   ZX_FREE(zxbus_cf->ctx, eid);
312   if (ver != ZXSIG_OK) {
313     ERR("ACK signature validation failed: %d", ver);
314     hi_free_resp(hit, resp, "ack ");
315     return;
316   }
317 
318   /* Record the receipt in /var/zxid/bus/ch/DEST/.ack/SHA1.ack for our audit trail, and to
319    * indicate that we need not attempt delivery again to this entity. */
320   write_all_fd_fmt(parent->ad.delivb.ack_fd, "ACK", sizeof(buf), buf, "AB1 %s ACK %.*s\n",
321 		   io->ent->eid, siglen, siglen?resp->ad.stomp.zx_rcpt_sig:"");
322 
323   hi_free_resp(hit, resp, "ack ");
324 
325   if (--(parent->ad.delivb.acks) <= 0) {
326     ASSERTOPI(parent->ad.delivb.acks, ==, 0);
327     close_file(parent->ad.delivb.ack_fd, "got_ack");
328     if (!parent->ad.delivb.nacks) {
329       D("Delivered to all: mv msg to .del par_%p", parent);
330       zxbus_retire(hit, parent);
331     }
332     hi_free_req(hit, parent, "parent ");
333   }
334 }
335 
336 /* Called by:  stomp_decode */
337 static void stomp_got_subsc(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req)
338 {
339   HI_SANITY(hit->shf, hit);
340   if (zxbus_subscribe(hit, io, req)) {
341     /* N.B. The receipt was already sent. It needs to be sent before
342      * scheduling pending deliveries, lest the simple listener clients
343      * get confused by seeing a MESSAGE when expecting RECEIPT. */
344   } else {
345     ERR("Subscribe Problem. Disk full? %d", 0);
346   }
347 }
348 
349 /* Called by:  stomp_decode */
350 static void stomp_got_unsubsc(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req)
351 {
352   stomp_cmd_ni(hit,io,req,"UNSUBSCRIBE\n");
353 }
354 
355 /*() Nonstandard STOMP command for ZXBUS testing.
356  * Based on different body content, diffrent magic can be invoked:
357  * dump - dump data structures to stdout with hi_dump() */
358 
359 /* Called by:  stomp_decode */
360 static void stomp_got_zxctl(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req)
361 {
362   D("ZXCTL(%.*s)", req->ad.stomp.len, req->ad.stomp.body);
363   if (!memcmp(req->ad.stomp.body, "dump", sizeof("dump")-1)) hi_dump(hit->shf);
364   stomp_send_receipt(hit, io, req);
365 }
366 
367 /*() Parse STOMP 1.1 header to a struct. */
368 
369 /* Called by:  stomp_decode, stomp_parse_pdu */
370 void stomp_parse_header(struct hi_pdu* req, char* hdr, char* val)
371 {
372 #define HDR(header, field, valu) } else if (!memcmp(hdr, header, sizeof(header)-1)) { if (!req->ad.stomp.field) req->ad.stomp.field = (valu)
373 
374   if (!memcmp(hdr, "content-length:", sizeof("content-length:")-1)) {
375     if (!req->ad.stomp.len) {
376       req->ad.stomp.len = atoi(val);
377       req->need = req->ad.stomp.len + val - req->m + 3;  /* not accurate if more headers follow, but fix below */
378     }
379     DD("len=%d need=%d (%.*s)", req->ad.stomp.len, req->need, val-hdr-1, hdr);
380   } else if (!memcmp(hdr, "content-type:", sizeof("content-type:")-1)) { /* ignore */
381   } else if (!memcmp(hdr, "message:", sizeof("message:")-1)) { /* ignore */
382   HDR("receipt:",        receipt,   val);
383   HDR("destination:",    dest,      val);
384   HDR("zx-rcpt-sig:",    zx_rcpt_sig, val);
385   HDR("host:",           host,      val);
386   HDR("version:",        vers,      val);
387   HDR("accept-version:", acpt_vers, val);
388   HDR("server:",         server,    val);
389   HDR("heart-beat:",     heart_bt,  val);
390   HDR("login:",          login,     val);
391   HDR("passcode:",       pw,        val);
392   HDR("session:",        session,   val);
393   HDR("transaction:",    tx_id,     val);
394   HDR("id:",             subs_id,   val);
395   HDR("subscription:",   subsc,     val);
396   HDR("ack:",            ack,       val);
397   HDR("message-id:",     msg_id,    val);
398   HDR("receipt-id:",     rcpt_id,   val);  DD("receipt-id(%.*s)", 4, req->ad.stomp.rcpt_id);
399   } else {
400     D("Unknown header(%s) ignored.", hdr);
401   }
402 }
403 
404 /*() STOMP decoder and dispatch.
405  * Return:: 0 for no error (including need more and PDU complete and processed),
406  * 1 to force closing connection. */
407 
408 /* Called by:  hi_read x2 */
409 int stomp_decode(struct hi_thr* hit, struct hi_io* io)
410 {
411   struct hi_pdu* req = io->cur_pdu;
412   char* command;
413   char* hdr;
414   char* val;
415   char* p = req->m;
416 
417   D("decode req(%p)->need=%d (%.*s)", req, req->need, (int)MIN(req->ap - req->m, 3), req->m);
418 
419   HI_SANITY(hit->shf, hit);
420 
421   /* Skip newlines and check size */
422 
423   for (; p < req->ap && ONE_OF_2(*p, '\n', '\r'); ++p) ;
424 
425   if (req->ap - p < STOMP_MIN_PDU_SIZE) {   /* too little, need more */
426     req->need = STOMP_MIN_PDU_SIZE;
427     D("need=%d have=%d", req->need, (int)(req->ap - req->m));
428     return  HI_NEED_MORE;
429   }
430 
431   /* Extract command */
432 
433   command = p;
434   hdr = memchr(p, '\n', req->ap - p);
435   if (!hdr || ++hdr == req->ap) {
436     req->need = MAX(STOMP_MIN_PDU_SIZE, req->ap - req->m + 2);
437     D("need=%d have=%d", req->need, (int)(req->ap - req->m));
438     return  HI_NEED_MORE;
439   }
440   memset(&req->ad.stomp, 0, sizeof(req->ad.stomp));
441   p = hdr;
442 
443   /* Decode headers
444    * 012345678901234567890
445    * STOMP\n\n\0
446    *         ^-p
447    * STOMP\r\n\r\n\0
448    *           ^-p
449    * STOMP\nhost:foo\n\n\0
450    *        ^-p        ^-pp
451    * STOMP\r\nhost:foo\r\n\r\n\0
452    *          ^-p          ^-pp
453    * STOMP\nhost:foo\naccept-version:1.1\n\n\0
454    *        ^-p       ^-pp                 ^-ppp
455    * STOMP\r\nhost:foo\r\naccept-version:1.1\r\n\r\n\0
456    *          ^-p         ^-pp                   ^-ppp
457    */
458 
459   while (!ONE_OF_2(*p,'\n','\r')) {
460     hdr = p;
461     p = memchr(p, '\n', req->ap - p);
462     if (!p || ++p == req->ap) {
463       req->need = MAX(STOMP_MIN_PDU_SIZE, req->ap - req->m + 2);
464       D("need=%d have=%d", req->need, (int)(req->ap - req->m));
465       return HI_NEED_MORE;
466     }
467     val = memchr(hdr, ':', p-hdr);
468     if (!val)
469       return stomp_frame_err(hit, io, req, "Header missing colon.");
470     ++val; /* skip : */
471     stomp_parse_header(req, hdr, val);
472   }
473 
474   /* Now body */
475 
476   if (*p == '\r') ++p;
477   req->ad.stomp.body = ++p;
478 
479   /* p now points to first byte of body (after \n\n; at nul if no body) */
480 
481   if (req->ad.stomp.len) {
482     req->need = p - req->m + req->ad.stomp.len + 1 /* nul */;  /* req->need has to be set correctly for hi_checkmore() to work right. */
483     if (req->ad.stomp.len < req->ap - p) {
484       /* Got complete with content-length */
485       p += req->ad.stomp.len;
486       if (*p++)
487 	return stomp_frame_err(hit, io, req, "No nul to terminate body.");
488     } else {
489       D("need=%d have=%d", req->need, (int)(req->ap - req->m));
490       return HI_NEED_MORE;
491     }
492   } else {
493     /* Scan until nul */
494     while (1) {
495       if (req->ap - p < 1) {   /* too little, need more */
496 	req->need = req->ap - req->m + 1;
497 	D("need=%d have=%d", req->need, (int)(req->ap - req->m));
498 	return HI_NEED_MORE;
499       }
500       if (!*p++) {  /* nul termination found */
501 	req->ad.stomp.len = p - req->ad.stomp.body - 1;
502 	req->need = p - req->m;
503 	break;
504       }
505     }
506   }
507 
508   HI_SANITY(hit->shf, hit);
509   hi_add_to_reqs(hit, io, req, STOMP_MIN_PDU_SIZE);
510   HI_SANITY(hit->shf, hit);
511 
512   /* Command dispatch */
513 
514 #define CMD(vrb, action) } else if (!memcmp(command, vrb, sizeof(vrb)-1)) { action
515 
516   if (!memcmp(command, "STOMP", sizeof("STOMP")-1)
517       || (!memcmp(command, "CONNECT", sizeof("CONNECT")-1)
518 	  &&ONE_OF_2(command[sizeof("CONNECT")],'\n','\r')))
519   { return stomp_got_login(hit,io,req);
520   CMD("SEND",        stomp_got_send(hit,io,req) );
521   CMD("ACK",         stomp_got_ack(hit,io,req) );
522   CMD("DISCONNECT",  return stomp_got_disc(hit,io,req) );
523   CMD("SUBSCRIBE",   stomp_got_subsc(hit,io,req) );
524   CMD("UNSUBSCRIBE", stomp_got_unsubsc(hit,io,req) );
525   CMD("BEGIN",       stomp_cmd_ni(hit,io,req,p) );
526   CMD("COMMIT",      stomp_cmd_ni(hit,io,req,p) );
527   CMD("ABORT",       stomp_cmd_ni(hit,io,req,p) );
528   CMD("NACK",        stomp_got_nack(hit,io,req) );
529   /* Commands client would see (sent by server) */
530   CMD("CONNECTED",   stomp_cmd_ni(hit,io,req,p) );
531   CMD("MESSAGE",     stomp_cmd_ni(hit,io,req,p) );
532   CMD("RECEIPT",     stomp_cmd_ni(hit,io,req,p) );
533   CMD("ERROR",       return stomp_got_err(hit,io,req) );
534   CMD("ZXCTL",       stomp_got_zxctl(hit,io,req) );  /* Custom command */
535   } else {
536     D("Unknown command(%.*s) ignored.", 4, command);
537     stomp_cmd_ni(hit,io,req,p);
538   }
539   return 0;
540 }
541 
542 /*() Parse PDU to extract headers. Typically this is called
543  * to recover a persisted PDU, i.e. we can assume to have
544  * all the data at hand already. This simplifies error reporting.
545  * return:: 0 on success, 1 on error
546  * see also:: stomp_decode() is usually used for decoring network traffic */
547 
548 /* Called by:  zxbus_sched_pending_delivery */
549 int stomp_parse_pdu(struct hi_pdu* pdu)
550 {
551   char* hdr;
552   char* val;
553   char* p;
554 
555   /* Parse STOMP message */
556 
557   memset(&pdu->ad.stomp, 0, sizeof(pdu->ad.stomp));
558   p = pdu->m;
559   hdr = memchr(p, '\n', pdu->ap - p);
560   if (!hdr || ++hdr == pdu->ap) {
561     pdu->need = MAX(STOMP_MIN_PDU_SIZE, pdu->ap - pdu->m + 2);
562     ERR("PDU from file is too small. need=%d have=%d", pdu->need, (int)(pdu->ap - pdu->m));
563     return 1;
564   }
565   p = hdr;
566 
567   while (!ONE_OF_2(*p,'\n','\r')) {
568     hdr = p;
569     p = memchr(p, '\n', pdu->ap - p);
570     if (!p || ++p == pdu->ap) {
571       pdu->need = MAX(STOMP_MIN_PDU_SIZE, pdu->ap - pdu->m + 2);
572       ERR("need=%d have=%d", pdu->need, (int)(pdu->ap - pdu->m));
573       return 1;
574     }
575     val = memchr(hdr, ':', p-hdr);
576     if (!val) {
577       ERR("Malformed PDU from file. Header missing a colon. %p", pdu);
578       return 1;
579     }
580     ++val; /* skip : */
581     stomp_parse_header(pdu, hdr, val);
582   }
583 
584   if (*p == '\r') ++p;
585   pdu->ad.stomp.body = ++p;
586 
587   if (pdu->ad.stomp.len) {
588     pdu->need = p - pdu->m + pdu->ad.stomp.len + 1 /* nul */;
589     if (pdu->ad.stomp.len < pdu->ap - p) {
590       /* Got complete with content-length */
591       p += pdu->ad.stomp.len;
592       if (*p++) {
593 	ERR("Malformed PDU from file: No nul to terminate body. %x", p[-1]);
594       }
595     } else {
596       ERR("PDU from file has specified length(%d), but not enough data. need=%d have=%d", pdu->ad.stomp.len, pdu->need, (int)(pdu->ap - pdu->m));
597       return 1;
598     }
599     return 0;
600   } else {
601     /* Scan until nul */
602     while (1) {
603       if (pdu->ap - p < 1) {   /* too little, need more */
604 	pdu->need = pdu->ap - pdu->m + 1;
605 	ERR("PDU from file without length but too short to have even nul termination. need=%d have=%d", pdu->need, (int)(pdu->ap - pdu->m));
606 	return 1;
607       }
608       if (!*p++) {  /* nul termination found */
609 	pdu->ad.stomp.len = p - pdu->ad.stomp.body - 1;
610 	pdu->need = p - pdu->m;
611 	return 0;
612       }
613     }
614   }
615   return 0;
616 }
617 
618 /* EOF  --  stomp.c */
619