1 /* stomp.c - STOMP 1.1 protocol for HIIOS engine 2 * Copyright (c) 2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved. 3 * This is confidential unpublished proprietary source code of the author. 4 * NO WARRANTY, not even implied warranties. Contains trade secrets. 5 * Distribution prohibited unless authorized in writing. See file COPYING. 6 * Special grant: http.c may be used with zxid open source project under 7 * same licensing terms as zxid itself. 8 * $Id$ 9 * 10 * 16.8.2012, created, based on http and smtp --Sampo 11 * 19.8.2012, added tolerance for CRLF where strictly LF is meant --Sampo 12 * 13 * STOMP 1.1 frame is considered a PDU and consists of 14 * 15 * \n -- zero, or more in case of heart beats 16 * COMMAND\n -- lines end in LF (not CRLF) 17 * header:value\n -- zero or more 18 * \n -- blank line separates headers and body 19 * payload 20 * \0 21 * \n -- zero, or more in case of heart beats 22 * 23 * See also: http://stomp.github.com/stomp-specification-1.1.html (20110331) 24 * Todo: implement heart beat generation and checking 25 */ 26 27 #include "platform.h" 28 #include "errmac.h" 29 #include "akbox.h" 30 #include "hiios.h" 31 #include "hiproto.h" 32 #include <zx/zxidconf.h> 33 #include <zx/zxidutil.h> 34 35 #include <ctype.h> 36 #include <memory.h> 37 #include <stdlib.h> 38 #include <netinet/in.h> /* htons(3) and friends */ 39 #include <sys/types.h> 40 #include <sys/stat.h> 41 #include <fcntl.h> 42 #include <errno.h> 43 44 /* Alias some struct fields for headers that can not be seen together. */ 45 #define receipt host 46 #define rcpt_id host 47 #define acpt_vers vers 48 #define tx_id vers 49 #define session login 50 #define subs_id login 51 #define subsc login 52 #define server pw 53 #define ack pw 54 #define msg_id pw 55 #define heart_bt dest 56 #define zx_rcpt_sig dest 57 #define STOMP_MIN_PDU_SIZE (sizeof("ACK\n\n\0")-1) 58 59 extern int verbose; /* defined in option parsing in zxbusd.c */ 60 extern zxid_conf* zxbus_cf; 61 62 #if 0 63 /* Called by: */ 64 static struct hi_pdu* stomp_encode_start(struct hi_thr* hit) 65 { 66 struct hi_pdu* resp = hi_pdu_alloc(hit,"stomp_enc_start"); 67 if (!resp) { hi_dump(hit->shf); NEVERNEVER("*** out of pdus in bad place %d", 0); } 68 return resp; 69 } 70 #endif 71 72 /*() Send ERROR to remote client. */ 73 74 /* Called by: stomp_frame_err, stomp_got_login x2, zxbus_persist x2 */ 75 int stomp_err(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req, const char* ecode, const char* emsg) 76 { 77 int len; 78 char* rcpt; 79 if ((rcpt = req->ad.stomp.receipt)) { 80 len = memchr(rcpt, '\n', req->ap - rcpt) - (void*)rcpt; 81 } else { 82 len = 1; 83 rcpt = "-"; 84 } 85 ERR("%s", emsg); 86 hi_sendf(hit, io, 0, req, "ERROR\nmessage:%s\nreceipt-id:%.*s\ncontent-type:text/plain\ncontent-length:%d\n\n%s%c", ecode, len, rcpt, strlen(emsg), emsg, 0); 87 return HI_CONN_CLOSE; 88 } 89 90 /*() Send an error early on in decode process */ 91 92 /* Called by: stomp_decode x2 */ 93 static int stomp_frame_err(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req, const char* emsg) 94 { 95 /* At this early stage the req is still a io->cur_pdu. We need to 96 * promote it to a real request so that the free logic will work right. */ 97 hi_add_to_reqs(hit, io, req, STOMP_MIN_PDU_SIZE); 98 return stomp_err(hit,io,req,"malformed frame received",emsg); 99 } 100 101 #define CMD_NI_MSG "Command(%.*s) not implemented by server." 102 103 /*() Send not implemented ERROR to remote client. */ 104 105 /* Called by: stomp_decode x7, stomp_got_unsubsc */ 106 static int stomp_cmd_ni(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req, const char* cmd) 107 { 108 const char* nl = strchr(cmd, '\n'); 109 int len; 110 char* rcpt; 111 if ((rcpt = req->ad.stomp.receipt)) { 112 len = memchr(rcpt, '\n', req->ap - rcpt) - (void*)rcpt; 113 } else { 114 len = 1; 115 rcpt = "-"; 116 } 117 ERR(CMD_NI_MSG, (int)(nl-cmd), cmd); 118 hi_sendf(hit, io, 0, req, "ERROR\nmessage:command not implemented by server\nreceipt-id:%.*s\ncontent-type:text/plain\ncontent-length:%d\n\n" CMD_NI_MSG "%c", len, rcpt, sizeof(CMD_NI_MSG)-3+(nl-cmd), nl-cmd, cmd, 0); 119 return HI_CONN_CLOSE; 120 } 121 122 /*() Got ERROR from remote client. */ 123 124 /* Called by: stomp_decode */ 125 static int stomp_got_err(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req) 126 { 127 /*struct hi_pdu* resp = stomp_encode_start(hit);*/ 128 /*hi_sendv(hit, io, 0, req, resp, len, resp->m, size, req->m + len);*/ 129 ERR("remote sent error(%.*s)", (int)(req->ap-req->m), req->m); 130 return HI_CONN_CLOSE; 131 } 132 133 /*() Send a receipt to client. */ 134 135 /* Called by: stomp_got_disc, stomp_got_send, stomp_got_zxctl, zxbus_subscribe */ 136 void stomp_send_receipt(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req) 137 { 138 int len; 139 char* rcpt; 140 char sigbuf[1024]; 141 if ((rcpt = req->ad.stomp.receipt)) { 142 len = (char*)memchr(rcpt, '\n', req->ap - rcpt) - rcpt; 143 } else { 144 len = 1; 145 rcpt = "-"; 146 } 147 DD("rcpt(%.*s) len=%d", len, rcpt, len); 148 149 zxbus_mint_receipt(zxbus_cf, sizeof(sigbuf), sigbuf, 150 len, rcpt, 151 -2, req->ad.stomp.dest, 152 -1, io->ent->eid, /* entity to which we issue receipt */ 153 req->ad.stomp.len, req->ad.stomp.body); 154 hi_sendf(hit, io, 0, req, "RECEIPT\nreceipt-id:%.*s\nzx-rcpt-sig:%s\n\n%c", len, rcpt, sigbuf,0); 155 } 156 157 /* STOMP Received Command Handling */ 158 159 /* Called by: stomp_decode */ 160 static int stomp_got_login(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req) 161 { 162 if (!req->ad.stomp.login) 163 return stomp_err(hit, io, req, "login fail", "No login header supplied (client error). zxbusd(8) requires login header whose value is the EntityID of the connecting client."); 164 165 if (zxbus_login_ent(hit, io, req)) { 166 hi_sendf(hit, io, 0, req, "CONNECTED\nversion:1.1\nserver:zxbusd-1.x\n\n%c", 0); 167 return 0; 168 } else 169 return stomp_err(hit, io, req, "login fail", "login failed either due to nonexistent entity id or bad credential"); 170 } 171 172 /* Called by: stomp_decode */ 173 static int stomp_got_disc(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req) 174 { 175 /* *** figure out the real receipt id */ 176 stomp_send_receipt(hit, io, req); 177 return HI_CONN_CLOSE; 178 } 179 180 /*() Main function for receiving audit bus summary log lines from SPs. 181 * This function will first store the line in a persistent way (RAID1 182 * arrangement needs to be implemented at the OS level), and then, 183 * perhaps, attempt to deliver the message to all subscribers. Indeed, 184 * delivery should be attempted first and if successful, the persistence 185 * is not necessary. If delivery is unsuccessful, the delivery needs 186 * to be retried, i.e. this is a store-and-forward system. However, 187 * the delivery first approach needs extensive IO engine 188 * operations and it thus may be easier to just store the message 189 * first and then have a separate process attempt the sending. This 190 * latter is the approach adopted here. */ 191 192 /* Called by: stomp_decode */ 193 static void stomp_got_send(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req) 194 { 195 if (zxbus_persist(hit, io, req)) { 196 stomp_send_receipt(hit, io, req); 197 } else { 198 ERR("Persist Problem. Disk full? %d", 0); 199 //hi_sendf(hit, io, 0, req, "ERROR\nmessage:persist failure\nreceipt-id:%.*s\n\nUnable to persist message. Can not guarantee reliable delivery, therefore rejecting.%c", len, rcpt, 0); 200 } 201 } 202 203 /*() Find a request that matches response. Looks in 204 * the io->pending list for message ID match. When found, 205 * the req is dequeued from pending. */ 206 207 /* Called by: stomp_got_ack, stomp_got_nack */ 208 static struct hi_pdu* stomp_find_pending_req_for_resp(struct hi_io* io, struct hi_pdu* resp) 209 { 210 struct hi_pdu* prev; 211 struct hi_pdu* req; 212 int midlen=resp->ad.stomp.msg_id?(strchr(resp->ad.stomp.msg_id,'\n')- resp->ad.stomp.msg_id):0; 213 214 LOCK(io->qel.mut, "ack"); 215 for (prev = 0, req = io->pending; req; prev = req, req = req->n) { 216 if (!memcmp(resp->ad.stomp.msg_id, req->ad.stomp.msg_id, midlen+1)) { 217 if (prev) 218 prev->n = req->n; 219 else 220 io->pending = req->n; 221 resp->req = req; 222 resp->parent = req->parent; 223 break; 224 } 225 } 226 UNLOCK(io->qel.mut, "ack"); 227 return req; 228 } 229 230 /*() Process NACK response from client to MESSAGE request sent by server. 231 * This is essentially nice way for the client to communicate to us it has 232 * difficulty in persisting the message. It could also just hang up and the 233 * net effect would be the same. However, receiving the NACK allows us to 234 * close the delivery bitch sooner so we can free the memory in the 235 * hopeless cases quicker. (*** there should also be a handler for 236 * close-connection lost that would do similar cleanup) */ 237 238 /* Called by: stomp_decode */ 239 static void stomp_got_nack(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* resp) 240 { 241 int sublen, midlen, siglen; 242 struct hi_pdu* parent; 243 244 sublen = resp->ad.stomp.subsc ? (strchr(resp->ad.stomp.subsc, '\n') - resp->ad.stomp.subsc) : 0; 245 midlen = resp->ad.stomp.msg_id ?(strchr(resp->ad.stomp.msg_id, '\n')- resp->ad.stomp.msg_id) : 0; 246 siglen = resp->ad.stomp.zx_rcpt_sig ? (strchr(resp->ad.stomp.zx_rcpt_sig, '\n') - resp->ad.stomp.zx_rcpt_sig) : 0; 247 248 D("NACK subsc(%.*s) msg_id(%.*s) zx_rcpt_sig(%.*s)", sublen, sublen?resp->ad.stomp.subsc:"", midlen, midlen?resp->ad.stomp.msg_id:"", siglen, siglen?resp->ad.stomp.zx_rcpt_sig:""); 249 250 ASSERTOPP(resp->req, ==, 0); 251 if (!stomp_find_pending_req_for_resp(io, resp)) { 252 ERR("Unsolicited NACK subsc(%.*s) msg_id(%.*s)", sublen, sublen?resp->ad.stomp.subsc:"", midlen, midlen?resp->ad.stomp.msg_id:""); 253 return; 254 } 255 parent = resp->parent; 256 ASSERT(parent); 257 258 /* *** add validation of zx_rcpt_sig. Lookup the cert using metadata for the EID. */ 259 /* Remember NACK somewhere? */ 260 hi_free_resp(hit, resp, "nack "); 261 262 ++(parent->ad.delivb.nacks); 263 if (--(parent->ad.delivb.acks) <= 0) { 264 ASSERTOPI(parent->ad.delivb.acks, ==, 0); 265 close_file(parent->ad.delivb.ack_fd, "got_nack"); 266 D("nack: freeing parent(%p)", parent); 267 hi_free_req(hit, parent, "nack "); 268 } 269 } 270 271 /*() Process ACK response from client to MESSAGE request sent by server. */ 272 273 /* Called by: stomp_decode */ 274 static void stomp_got_ack(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* resp) 275 { 276 int sublen, midlen, siglen, ver; 277 struct hi_pdu* parent; 278 char* eid; 279 char buf[1024]; 280 281 /* First, it was wrong for hi_add_to_reqs() to be called for an ACK as acks are 282 * really responses to MESSAGEs. Thus undo that action here. */ 283 284 hi_del_from_reqs(io, resp); 285 286 sublen = resp->ad.stomp.subsc ? (strchr(resp->ad.stomp.subsc, '\n') - resp->ad.stomp.subsc) : 0; 287 midlen = resp->ad.stomp.msg_id ?(strchr(resp->ad.stomp.msg_id, '\n')- resp->ad.stomp.msg_id) : 0; 288 siglen = resp->ad.stomp.zx_rcpt_sig ? (strchr(resp->ad.stomp.zx_rcpt_sig, '\n') - resp->ad.stomp.zx_rcpt_sig) : 0; 289 290 DD("ACK subsc(%.*s) msg_id(%.*s) zx_rcpt_sig(%.*s)", sublen, sublen?resp->ad.stomp.subsc:"", midlen, midlen?resp->ad.stomp.msg_id:"", siglen, siglen?resp->ad.stomp.zx_rcpt_sig:""); 291 292 if (!stomp_find_pending_req_for_resp(io, resp)) { 293 ERR("Unsolicited ACK subsc(%.*s) msg_id(%.*s)", sublen, sublen?resp->ad.stomp.subsc:"", midlen, midlen?resp->ad.stomp.msg_id:""); 294 return; 295 } 296 parent = resp->parent; 297 ASSERT(parent); 298 299 if (errmac_debug>1) 300 D("ACK par_%p->len=%d rq_%p->len=%d\nparent->body(%.*s)\n req->body(%.*s)", parent, parent->ad.delivb.len, resp->req, resp->req->ad.stomp.len, parent->ad.delivb.len, parent->ad.delivb.body, resp->req->ad.stomp.len, resp->req->ad.stomp.body); 301 else 302 D("ACK par_%p->len=%d rq_%p->len=%d", parent, parent->ad.delivb.len, resp->req, resp->req->ad.stomp.len); 303 304 eid = zxid_my_ent_id_cstr(zxbus_cf); 305 ver = zxbus_verify_receipt(zxbus_cf, io->ent->eid, 306 siglen, siglen?resp->ad.stomp.zx_rcpt_sig:"", 307 -2, resp->req->ad.stomp.msg_id, 308 -2, resp->req->ad.stomp.dest, 309 -1, eid, /* our eid, the receipt was issued to us */ 310 resp->req->ad.stomp.len, resp->req->ad.stomp.body); 311 ZX_FREE(zxbus_cf->ctx, eid); 312 if (ver != ZXSIG_OK) { 313 ERR("ACK signature validation failed: %d", ver); 314 hi_free_resp(hit, resp, "ack "); 315 return; 316 } 317 318 /* Record the receipt in /var/zxid/bus/ch/DEST/.ack/SHA1.ack for our audit trail, and to 319 * indicate that we need not attempt delivery again to this entity. */ 320 write_all_fd_fmt(parent->ad.delivb.ack_fd, "ACK", sizeof(buf), buf, "AB1 %s ACK %.*s\n", 321 io->ent->eid, siglen, siglen?resp->ad.stomp.zx_rcpt_sig:""); 322 323 hi_free_resp(hit, resp, "ack "); 324 325 if (--(parent->ad.delivb.acks) <= 0) { 326 ASSERTOPI(parent->ad.delivb.acks, ==, 0); 327 close_file(parent->ad.delivb.ack_fd, "got_ack"); 328 if (!parent->ad.delivb.nacks) { 329 D("Delivered to all: mv msg to .del par_%p", parent); 330 zxbus_retire(hit, parent); 331 } 332 hi_free_req(hit, parent, "parent "); 333 } 334 } 335 336 /* Called by: stomp_decode */ 337 static void stomp_got_subsc(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req) 338 { 339 HI_SANITY(hit->shf, hit); 340 if (zxbus_subscribe(hit, io, req)) { 341 /* N.B. The receipt was already sent. It needs to be sent before 342 * scheduling pending deliveries, lest the simple listener clients 343 * get confused by seeing a MESSAGE when expecting RECEIPT. */ 344 } else { 345 ERR("Subscribe Problem. Disk full? %d", 0); 346 } 347 } 348 349 /* Called by: stomp_decode */ 350 static void stomp_got_unsubsc(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req) 351 { 352 stomp_cmd_ni(hit,io,req,"UNSUBSCRIBE\n"); 353 } 354 355 /*() Nonstandard STOMP command for ZXBUS testing. 356 * Based on different body content, diffrent magic can be invoked: 357 * dump - dump data structures to stdout with hi_dump() */ 358 359 /* Called by: stomp_decode */ 360 static void stomp_got_zxctl(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req) 361 { 362 D("ZXCTL(%.*s)", req->ad.stomp.len, req->ad.stomp.body); 363 if (!memcmp(req->ad.stomp.body, "dump", sizeof("dump")-1)) hi_dump(hit->shf); 364 stomp_send_receipt(hit, io, req); 365 } 366 367 /*() Parse STOMP 1.1 header to a struct. */ 368 369 /* Called by: stomp_decode, stomp_parse_pdu */ 370 void stomp_parse_header(struct hi_pdu* req, char* hdr, char* val) 371 { 372 #define HDR(header, field, valu) } else if (!memcmp(hdr, header, sizeof(header)-1)) { if (!req->ad.stomp.field) req->ad.stomp.field = (valu) 373 374 if (!memcmp(hdr, "content-length:", sizeof("content-length:")-1)) { 375 if (!req->ad.stomp.len) { 376 req->ad.stomp.len = atoi(val); 377 req->need = req->ad.stomp.len + val - req->m + 3; /* not accurate if more headers follow, but fix below */ 378 } 379 DD("len=%d need=%d (%.*s)", req->ad.stomp.len, req->need, val-hdr-1, hdr); 380 } else if (!memcmp(hdr, "content-type:", sizeof("content-type:")-1)) { /* ignore */ 381 } else if (!memcmp(hdr, "message:", sizeof("message:")-1)) { /* ignore */ 382 HDR("receipt:", receipt, val); 383 HDR("destination:", dest, val); 384 HDR("zx-rcpt-sig:", zx_rcpt_sig, val); 385 HDR("host:", host, val); 386 HDR("version:", vers, val); 387 HDR("accept-version:", acpt_vers, val); 388 HDR("server:", server, val); 389 HDR("heart-beat:", heart_bt, val); 390 HDR("login:", login, val); 391 HDR("passcode:", pw, val); 392 HDR("session:", session, val); 393 HDR("transaction:", tx_id, val); 394 HDR("id:", subs_id, val); 395 HDR("subscription:", subsc, val); 396 HDR("ack:", ack, val); 397 HDR("message-id:", msg_id, val); 398 HDR("receipt-id:", rcpt_id, val); DD("receipt-id(%.*s)", 4, req->ad.stomp.rcpt_id); 399 } else { 400 D("Unknown header(%s) ignored.", hdr); 401 } 402 } 403 404 /*() STOMP decoder and dispatch. 405 * Return:: 0 for no error (including need more and PDU complete and processed), 406 * 1 to force closing connection. */ 407 408 /* Called by: hi_read x2 */ 409 int stomp_decode(struct hi_thr* hit, struct hi_io* io) 410 { 411 struct hi_pdu* req = io->cur_pdu; 412 char* command; 413 char* hdr; 414 char* val; 415 char* p = req->m; 416 417 D("decode req(%p)->need=%d (%.*s)", req, req->need, (int)MIN(req->ap - req->m, 3), req->m); 418 419 HI_SANITY(hit->shf, hit); 420 421 /* Skip newlines and check size */ 422 423 for (; p < req->ap && ONE_OF_2(*p, '\n', '\r'); ++p) ; 424 425 if (req->ap - p < STOMP_MIN_PDU_SIZE) { /* too little, need more */ 426 req->need = STOMP_MIN_PDU_SIZE; 427 D("need=%d have=%d", req->need, (int)(req->ap - req->m)); 428 return HI_NEED_MORE; 429 } 430 431 /* Extract command */ 432 433 command = p; 434 hdr = memchr(p, '\n', req->ap - p); 435 if (!hdr || ++hdr == req->ap) { 436 req->need = MAX(STOMP_MIN_PDU_SIZE, req->ap - req->m + 2); 437 D("need=%d have=%d", req->need, (int)(req->ap - req->m)); 438 return HI_NEED_MORE; 439 } 440 memset(&req->ad.stomp, 0, sizeof(req->ad.stomp)); 441 p = hdr; 442 443 /* Decode headers 444 * 012345678901234567890 445 * STOMP\n\n\0 446 * ^-p 447 * STOMP\r\n\r\n\0 448 * ^-p 449 * STOMP\nhost:foo\n\n\0 450 * ^-p ^-pp 451 * STOMP\r\nhost:foo\r\n\r\n\0 452 * ^-p ^-pp 453 * STOMP\nhost:foo\naccept-version:1.1\n\n\0 454 * ^-p ^-pp ^-ppp 455 * STOMP\r\nhost:foo\r\naccept-version:1.1\r\n\r\n\0 456 * ^-p ^-pp ^-ppp 457 */ 458 459 while (!ONE_OF_2(*p,'\n','\r')) { 460 hdr = p; 461 p = memchr(p, '\n', req->ap - p); 462 if (!p || ++p == req->ap) { 463 req->need = MAX(STOMP_MIN_PDU_SIZE, req->ap - req->m + 2); 464 D("need=%d have=%d", req->need, (int)(req->ap - req->m)); 465 return HI_NEED_MORE; 466 } 467 val = memchr(hdr, ':', p-hdr); 468 if (!val) 469 return stomp_frame_err(hit, io, req, "Header missing colon."); 470 ++val; /* skip : */ 471 stomp_parse_header(req, hdr, val); 472 } 473 474 /* Now body */ 475 476 if (*p == '\r') ++p; 477 req->ad.stomp.body = ++p; 478 479 /* p now points to first byte of body (after \n\n; at nul if no body) */ 480 481 if (req->ad.stomp.len) { 482 req->need = p - req->m + req->ad.stomp.len + 1 /* nul */; /* req->need has to be set correctly for hi_checkmore() to work right. */ 483 if (req->ad.stomp.len < req->ap - p) { 484 /* Got complete with content-length */ 485 p += req->ad.stomp.len; 486 if (*p++) 487 return stomp_frame_err(hit, io, req, "No nul to terminate body."); 488 } else { 489 D("need=%d have=%d", req->need, (int)(req->ap - req->m)); 490 return HI_NEED_MORE; 491 } 492 } else { 493 /* Scan until nul */ 494 while (1) { 495 if (req->ap - p < 1) { /* too little, need more */ 496 req->need = req->ap - req->m + 1; 497 D("need=%d have=%d", req->need, (int)(req->ap - req->m)); 498 return HI_NEED_MORE; 499 } 500 if (!*p++) { /* nul termination found */ 501 req->ad.stomp.len = p - req->ad.stomp.body - 1; 502 req->need = p - req->m; 503 break; 504 } 505 } 506 } 507 508 HI_SANITY(hit->shf, hit); 509 hi_add_to_reqs(hit, io, req, STOMP_MIN_PDU_SIZE); 510 HI_SANITY(hit->shf, hit); 511 512 /* Command dispatch */ 513 514 #define CMD(vrb, action) } else if (!memcmp(command, vrb, sizeof(vrb)-1)) { action 515 516 if (!memcmp(command, "STOMP", sizeof("STOMP")-1) 517 || (!memcmp(command, "CONNECT", sizeof("CONNECT")-1) 518 &&ONE_OF_2(command[sizeof("CONNECT")],'\n','\r'))) 519 { return stomp_got_login(hit,io,req); 520 CMD("SEND", stomp_got_send(hit,io,req) ); 521 CMD("ACK", stomp_got_ack(hit,io,req) ); 522 CMD("DISCONNECT", return stomp_got_disc(hit,io,req) ); 523 CMD("SUBSCRIBE", stomp_got_subsc(hit,io,req) ); 524 CMD("UNSUBSCRIBE", stomp_got_unsubsc(hit,io,req) ); 525 CMD("BEGIN", stomp_cmd_ni(hit,io,req,p) ); 526 CMD("COMMIT", stomp_cmd_ni(hit,io,req,p) ); 527 CMD("ABORT", stomp_cmd_ni(hit,io,req,p) ); 528 CMD("NACK", stomp_got_nack(hit,io,req) ); 529 /* Commands client would see (sent by server) */ 530 CMD("CONNECTED", stomp_cmd_ni(hit,io,req,p) ); 531 CMD("MESSAGE", stomp_cmd_ni(hit,io,req,p) ); 532 CMD("RECEIPT", stomp_cmd_ni(hit,io,req,p) ); 533 CMD("ERROR", return stomp_got_err(hit,io,req) ); 534 CMD("ZXCTL", stomp_got_zxctl(hit,io,req) ); /* Custom command */ 535 } else { 536 D("Unknown command(%.*s) ignored.", 4, command); 537 stomp_cmd_ni(hit,io,req,p); 538 } 539 return 0; 540 } 541 542 /*() Parse PDU to extract headers. Typically this is called 543 * to recover a persisted PDU, i.e. we can assume to have 544 * all the data at hand already. This simplifies error reporting. 545 * return:: 0 on success, 1 on error 546 * see also:: stomp_decode() is usually used for decoring network traffic */ 547 548 /* Called by: zxbus_sched_pending_delivery */ 549 int stomp_parse_pdu(struct hi_pdu* pdu) 550 { 551 char* hdr; 552 char* val; 553 char* p; 554 555 /* Parse STOMP message */ 556 557 memset(&pdu->ad.stomp, 0, sizeof(pdu->ad.stomp)); 558 p = pdu->m; 559 hdr = memchr(p, '\n', pdu->ap - p); 560 if (!hdr || ++hdr == pdu->ap) { 561 pdu->need = MAX(STOMP_MIN_PDU_SIZE, pdu->ap - pdu->m + 2); 562 ERR("PDU from file is too small. need=%d have=%d", pdu->need, (int)(pdu->ap - pdu->m)); 563 return 1; 564 } 565 p = hdr; 566 567 while (!ONE_OF_2(*p,'\n','\r')) { 568 hdr = p; 569 p = memchr(p, '\n', pdu->ap - p); 570 if (!p || ++p == pdu->ap) { 571 pdu->need = MAX(STOMP_MIN_PDU_SIZE, pdu->ap - pdu->m + 2); 572 ERR("need=%d have=%d", pdu->need, (int)(pdu->ap - pdu->m)); 573 return 1; 574 } 575 val = memchr(hdr, ':', p-hdr); 576 if (!val) { 577 ERR("Malformed PDU from file. Header missing a colon. %p", pdu); 578 return 1; 579 } 580 ++val; /* skip : */ 581 stomp_parse_header(pdu, hdr, val); 582 } 583 584 if (*p == '\r') ++p; 585 pdu->ad.stomp.body = ++p; 586 587 if (pdu->ad.stomp.len) { 588 pdu->need = p - pdu->m + pdu->ad.stomp.len + 1 /* nul */; 589 if (pdu->ad.stomp.len < pdu->ap - p) { 590 /* Got complete with content-length */ 591 p += pdu->ad.stomp.len; 592 if (*p++) { 593 ERR("Malformed PDU from file: No nul to terminate body. %x", p[-1]); 594 } 595 } else { 596 ERR("PDU from file has specified length(%d), but not enough data. need=%d have=%d", pdu->ad.stomp.len, pdu->need, (int)(pdu->ap - pdu->m)); 597 return 1; 598 } 599 return 0; 600 } else { 601 /* Scan until nul */ 602 while (1) { 603 if (pdu->ap - p < 1) { /* too little, need more */ 604 pdu->need = pdu->ap - pdu->m + 1; 605 ERR("PDU from file without length but too short to have even nul termination. need=%d have=%d", pdu->need, (int)(pdu->ap - pdu->m)); 606 return 1; 607 } 608 if (!*p++) { /* nul termination found */ 609 pdu->ad.stomp.len = p - pdu->ad.stomp.body - 1; 610 pdu->need = p - pdu->m; 611 return 0; 612 } 613 } 614 } 615 return 0; 616 } 617 618 /* EOF -- stomp.c */ 619