1 /* zxbusdist.c  -  Message persist and distribution, subscription management
2  * Copyright (c) 2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved.
3  * This is confidential unpublished proprietary source code of the author.
4  * NO WARRANTY, not even implied warranties. Contains trade secrets.
5  * Distribution prohibited unless authorized in writing. See file COPYING.
6  * Special grant: http.c may be used with zxid open source project under
7  * same licensing terms as zxid itself.
8  * $Id$
9  *
10  * 16.8.2012, created --Sampo
11  * 30.8.2012, added subscription mechanisms --Sampo
12  * 5.9.2012,  separated entity management and subscriptions to own source files --Sampo
13  *
14  * Subscriptions are organized by destination channel. First the
15  * channel is looked up and then list of entity objects is chased to
16  * deliver the message to each of them. If an entity is currently
17  * logged in, it will have an io object and we deliver
18  * immediately. Entities that are subscribed, but not currently logged
19  * in will cause pending delivery. In fact all entities should be
20  * considered pending until ACK has been received WRT to the message.
21  * Hence we opt to remember the ACK'd entities rather than the pending
22  * ones. (A slight side effect of this is that you can subscribe to a
23  * channel and receive messages that were generated prior to your
24  * subscription having been created.)
25  *
26  * The records about ACKs are kept in append only files under
27  * ch/DEST/.ack/SHA1.ack which has one per line the entity IDs that
28  * have ACKd (AB0 format). This file might also contain cryptographic
29  * signature proof of the ACK. In that case the first 3 characters of
30  * the line specify the version and type of line (initially "AB1 " -
31  * note the space), followed by the entity id and safe-base64 of the
32  * signature, separated by a space (n.b. the filename itself is a SHA1
33  * hash of the content of the message).
34  *
35  * When it has been determined that all entities have ACKd a message
36  * it is retired either by deleting it or by moving it to .del/
37  * directory.  The latter option allows some post processing before
38  * removal - or simply can act as a convenient cache of recent
39  * messages for debugging purposes.  .del/ SHOULD be cleaned
40  * periocially by a cron job.
41  *
42  * When an entity sends SUBSCRIBE command, all messages pending for
43  * the entity on that channel are sent. This will force scanning the
44  * ACK receipt files to determine if all subscribers have ACKd so that
45  * message retirement can be triggered. To avoid a full matrix of acks
46  * and nacks, we maintain counts. *** counts may be thrown off if an
47  * entity joins in middle of delivery attempt.
48  *
49  * Number of channels is expected to be relatively small, except for
50  * per user channels that are handled as a special case. The number of
51  * subscribers to common channels is expected to be extremely large,
52  * The number of subscribers for per user channel are expected to be
53  * relatively small. Thus alternative structure is to simply scan the
54  * io object array. This has the advantage of not maintaining a
55  * separate data structure that would require additional pointer
56  * fields and additional locking.
57  *
58  * Persistent store of channel subscriptions is realized by having in
59  * each channel directory a special file .subs which lists the entity
60  * IDs of the subscribers, one per line. When zxbusd starts, it loads
61  * this persisted data to memory. When new subscriptions are made, the
62  * subscription is "written through" to persistent storage and the
63  * in-memory data structure is updated as well.
64  */
65 
66 #include "platform.h"
67 #include "errmac.h"
68 #include "akbox.h"
69 #include "hiios.h"
70 #include "hiproto.h"
71 #include <zx/zxidconf.h>
72 #include <zx/zxidutil.h>
73 
74 #define __USE_GNU 1  /* for O_DIRECT */
75 
76 #include <ctype.h>
77 #include <memory.h>
78 #include <stdlib.h>
79 #include <netinet/in.h> /* htons(3) and friends */
80 #include <sys/types.h>
81 #include <sys/stat.h>
82 #include <fcntl.h>
83 #include <errno.h>
84 
85 /* Alias some struct fields for headers that can not be seen together. */
86 #define receipt   host
87 #define rcpt_id   host
88 #define acpt_vers vers
89 #define tx_id     vers
90 #define session   login
91 #define subs_id   login
92 #define subsc     login
93 #define server    pw
94 #define ack       pw
95 #define msg_id    pw
96 #define heart_bt  dest
97 #define zx_rcpt_sig dest
98 
99 extern int verbose;  /* defined in option parsing in zxbusd.c */
100 extern zxid_conf* zxbus_cf;
101 extern char* zxbus_path;
102 
103 /*() Read the .ack/SHA1 file for a message and parse it into linked
104  * lists of hi_ack nodes attached to entities. The file consists
105  * of lines like
106  *   AB1 eid ACK sig
107  * The pdu should be the delivery or pending bitch PDU.
108  * locking:: whill take hit->shf->ent_mut */
109 
110 /* Called by:  zxbus_sched_pending_delivery */
zxbus_load_acks(struct hi_thr * hit,struct hi_pdu * pdu,int fd)111 static void zxbus_load_acks(struct hi_thr* hit, struct hi_pdu* pdu, int fd)
112 {
113   struct hi_ack* ack;
114   struct hi_ent* ent;
115   char* p;
116   char* nl;
117   char* aa;
118   char* buf;
119   int gotall, len = get_file_size(fd);
120   ZMALLOCN(buf, len+1);
121   if (read_all_fd(fd, buf, len, &gotall) == -1) {
122     D("reading acks failed gotall=%d",gotall);
123     FREE(buf);
124     return;
125   }
126   buf[gotall] = 0;
127 
128   LOCK(hit->shf->ent_mut, "load-acks");  // *** very big lock
129   D("LOCK ent_mut->thr=%lx (%s:%d)", (long)hit->shf->ent_mut.thr, hit->shf->ent_mut.func, hit->shf->ent_mut.line);
130   for (p = buf; p < buf+gotall; p = nl+1) {
131     if (!(nl = strchr(p, '\n')))
132       nl = buf+gotall;
133     if (!memcmp(p, "AB1 ", sizeof("AB1 ")-1)) {
134       if (aa = zx_memmem(p+sizeof("AB1 ")-1, nl-p, " ACK", sizeof(" ACK")-1)) {
135 	if (ent = zxbus_load_ent(hit->shf, aa-(p+sizeof("AB1 ")-1), p+sizeof("AB1 ")-1)) {
136 	  ZMALLOC(ack);
137 	  ack->pdu = pdu;
138 	  ack->n = ent->acks;
139 	  ent->acks = ack;
140 	  D("Added ack pdu_%p to ent_%p eid(%s)", pdu, ent, ent->eid);
141 	} else {
142 	  ERR("Entity of the ACK not found. line(%.*s), skipping", (int)(nl-p), p);
143 	}
144       } else {
145 	ERR("Not an ACK line(%.*s) in acks, skipping", (int)(nl-p), p);
146       }
147     } else {
148       ERR("Bad line(%.*s) in acks, skipping", (int)(nl-p), p);
149     }
150   }
151   D("UNLOCK ent_mut->thr=%lx (%s:%d)", (long)hit->shf->ent_mut.thr, hit->shf->ent_mut.func, hit->shf->ent_mut.line);
152   UNLOCK(hit->shf->ent_mut, "load-acks");
153   FREE(buf);
154 }
155 
156 /*() Check if pdu is in the entity's already acked list.
157  * The pdu should be the delivery or pending bitch PDU.
158  * locking:: caller MUST hold shf->ent_mut
159  * return:: 1 if found (and as side effect remove), 0=not found */
160 
161 /* Called by:  stomp_msg_deliver */
zxbus_already_ackd(struct hi_ent * ent,struct hi_pdu * pdu)162 static int zxbus_already_ackd(struct hi_ent* ent, struct hi_pdu* pdu)
163 {
164   struct hi_ack* prev;
165   struct hi_ack* ack = ent->acks;
166   D("Checking ent_%p eid(%s) io(%x) acks_%p pdu_%p", ent, ent->eid, ent->io?ent->io->fd:0xdeadbeef, ack, pdu);
167   if (!ack)
168     return 0;
169   if (ack->pdu == pdu) {
170     D("Already ACKd by eid(%s)", ent->eid);
171     ent->acks = ack->n;
172     FREE(ack);
173     return 1;
174   }
175   for (prev = ack, ack = ack->n; ack; prev = ack, ack = ack->n)
176     if (ack->pdu == pdu) {
177       D("Already ACKd by eid(%s)", ent->eid);
178       prev->n = ack->n;
179       FREE(ack);
180       return 1;
181     }
182   return 0;
183 }
184 
185 /*() Handle special "delivery bitch" PDU that represents need to
186  * send a message to listeners of a channel (aka destination).
187  * zxbus_persist() creates a synthetic PDU which is scheduled for the delivery
188  * work in todo queue. This PDU is not associated to any particular
189  * io object and will keep on rescheduling itself until its job
190  * has been done. At that point it will free itself.
191  * locking:: whill take hit->shf->ent_mut */
192 
193 /* Called by:  hi_shuffle */
stomp_msg_deliver(struct hi_thr * hit,struct hi_pdu * db_pdu)194 void stomp_msg_deliver(struct hi_thr* hit, struct hi_pdu* db_pdu)
195 {
196   struct hi_ent* ent;
197   struct hi_ch* ch;
198   int ch_num;
199   D("db_pdu(%p) events=0x%x", db_pdu, db_pdu->events);
200 
201   ch = zxbus_find_ch(hit->shf, -2, db_pdu->ad.delivb.dest);
202   if (!ch)
203     return;
204   ch_num = ch - hit->shf->chs;
205   LOCK(hit->shf->ent_mut, "deliver"); // *** very big lock, held across I/O. Consider per ent lock
206   for (ent = hit->shf->ents; ent; ent = ent->n)
207     if (ent->chs[ch_num]) {  /* entity listens on this channel? */
208       if (zxbus_already_ackd(ent, db_pdu)) {
209 	DD("Already ACKd eid(%s)", ent->eid);
210       } else if (ent->io && ent->chs[ch_num] == HI_SUBS_ON) {
211 	hi_sendf(hit, ent->io, db_pdu, 0,
212 		 "MESSAGE\nsubscription:%s\nmessage-id:%d\ndestination:%.*s\ncontent-length:%d\n\n%.*s%c",
213 		 "0", ent->io->ad.stomp.msgid++,
214 		 (strchr(db_pdu->ad.delivb.dest, '\n') - db_pdu->ad.delivb.dest), db_pdu->ad.delivb.dest,
215 		 db_pdu->ad.delivb.len, db_pdu->ad.delivb.len,
216 		 db_pdu->ad.delivb.body, 0);
217 	/* the receiving half will decrement  ++(int)db_pdu->ad.delivb.acks */
218 	++(db_pdu->ad.delivb.acks); /* number of ACKs pending due to MESSAGEs sent */
219       } else {
220 	D("Can not deliver. entity(%s) not connected at the moment?", ent->eid);
221 	ent->chs[ch_num] = HI_SUBS_PEND;
222 	++(db_pdu->ad.delivb.nacks);
223       }
224     }
225   UNLOCK(hit->shf->ent_mut, "deliver");
226 #if 0
227   if (db_pdu->ad.delivb.acks)  /* still something pending? */
228     hi_todo_produce(hit, &db_pdu->qel, "deliv-bitch-again", 0);
229   else
230     hi_free_req(hit, db_pdu, "db_pdu ");
231 #else
232   /* No rescheduling. Operate in one-shot mode: all connected ones get delivery attempt.
233    * The cleanup will happen when last ACK is received and db_pdu->ad.delivb.acks
234    * count has dropped to zero. Redelivery attempts later are handled separately. */
235 #endif
236 }
237 
238 /*() Schedule new delivery to happen. See stomp_msg_deliver() for what happens next.
239  * We create a synthetic PDU which is scheduled for the delivery
240  * work in todo queue. This PDU is not associated to any particular
241  * io object (and will keep on rescheduling itself until its job
242  * has been done (??? may be not)). At that point it will free itself.
243  * The acks will be written to ack_fd (to avoid 99% of the double delivery, and
244  * to have an audit trail on our side about deliveries. */
245 
246 /* Called by:  zxbus_persist */
zxbus_sched_new_delivery(struct hi_thr * hit,struct hi_pdu * req,const char * sha1name,int dest_len,const char * dest)247 static void zxbus_sched_new_delivery(struct hi_thr* hit, struct hi_pdu* req, const char* sha1name, int dest_len, const char* dest)
248 {
249   struct hi_pdu* pdu = hi_pdu_alloc(hit, "deliv-bitch");
250   pdu->qel.kind = HI_PDU_DIST;
251   memcpy(pdu->m, req->m, req->need);  /* copy PDU substance */
252   pdu->ap += req->need;
253   pdu->ad.delivb.len = req->ad.delivb.len;
254   pdu->ad.delivb.body = pdu->m + (req->ad.stomp.body - req->m);
255   pdu->ad.delivb.dest = pdu->m + (req->ad.stomp.dest - req->m);
256 
257   pdu->ad.delivb.acks = 0;
258   pdu->ad.delivb.nacks = 0;
259 
260   //  | O_DIRECT  -- seems to give alignment problems, i.e. 22 EINVAL Invalid Argument
261   pdu->ad.delivb.ack_fd = open_fd_from_path(O_CREAT | O_WRONLY | O_APPEND | O_SYNC, 0666, "sched deliv", 1, "%s" ZXBUS_CH_DIR "%.*s/.ack/%s", zxbus_path, dest_len, dest, sha1name);
262   hi_todo_produce(hit, &pdu->qel, "deliv-bitch", 0);
263 }
264 
265 /*() Scan messages in channel directory and schedule pending ones for delivery.
266  * We avoid delivering to listeners that have already received the PDU
267  * by reading in the .ack/SHA1 file and attaching to pending bitch PDU
268  * a linked list of already successful entities. Linked list because
269  * it is simplest, but a hash table could be more effective. */
270 
271 /* Called by:  zxbus_subscribe */
zxbus_sched_pending_delivery(struct hi_thr * hit,const char * dest)272 void zxbus_sched_pending_delivery(struct hi_thr* hit, const char* dest)
273 {
274   char path[ZXID_MAX_BUF];
275   struct dirent* de;
276   DIR* dir;
277   struct hi_pdu* pdu;
278 
279   name_from_path(path, sizeof(path), "%s" ZXBUS_CH_DIR "%s", zxbus_path, dest);
280   dir = opendir(path);
281   if (!dir) {
282     perror("opendir for /var/zxid/bus/ch/DEST (or other if configured)");
283     D("failed path(%s) dest(%s)", path, dest);
284     return;
285   }
286 
287   while (de = readdir(dir))  /* iterate over messages in the channel directory */
288     if (de->d_name[0] != '.' && de->d_name[strlen(de->d_name)-1] != '~') { /* ign hidden&backup */
289       if (!(pdu = hi_pdu_alloc(hit, "pend-bitch")))
290 	break;
291       pdu->qel.kind = HI_PDU_DIST;
292       pdu->ap += read_all(pdu->lim - pdu->ap, pdu->ap, "pend-bitch", 1,
293 			  "%s" ZXBUS_CH_DIR "%s/%s", zxbus_path, dest, de->d_name);
294 
295       if (stomp_parse_pdu(pdu))
296 	continue;  /* parse error in PDU */
297 
298       pdu->ad.delivb.acks = 0;
299       pdu->ad.delivb.nacks = 0;
300 
301       //  | O_DIRECT  -- seems to give alignment problems, i.e. 22 EINVAL Invalid Argument
302       pdu->ad.delivb.ack_fd = open_fd_from_path(O_CREAT | O_RDWR | O_APPEND | O_SYNC, 0666, "pend", 1, "%s" ZXBUS_CH_DIR "%s/.ack/%s", zxbus_path, dest, de->d_name);
303       zxbus_load_acks(hit, pdu, pdu->ad.delivb.ack_fd);
304       hi_todo_produce(hit, &pdu->qel, "pend-bitch", 0);
305     }
306   closedir(dir);
307 }
308 
309 /*() Retire fully delivered message.
310  * The message is moved to .del/ for later removal if it exists,
311  * or just unlinked on the spot.
312  * return:: 0 on fail, 1 on rename to .del, 2 on unlink */
313 
314 /* Called by:  stomp_got_ack */
zxbus_retire(struct hi_thr * hit,struct hi_pdu * db_pdu)315 int zxbus_retire(struct hi_thr* hit, struct hi_pdu* db_pdu)
316 {
317   int len, dest_len;
318   char c_path[ZXID_MAX_BUF];  /* current channel path */
319   char d_path[ZXID_MAX_BUF];  /* .del path after atomic rename */
320 
321   dest_len = strchr(db_pdu->ad.delivb.dest, '\n')-db_pdu->ad.delivb.dest;
322   len = name_from_path(c_path, sizeof(c_path), "%sch/%.*s/", zxbus_path, dest_len, db_pdu->ad.delivb.dest);
323   if (sizeof(c_path)-len < 28+5 /* +5 accounts for d_path having 5 more chars (.del/) */) {
324     ERR("The c_path for retiring exceeds limit. len=%d", len);
325     return 0;
326   }
327   DD("c_path(%s) len=%d", c_path, len);
328   DD("sha1_input(%.*s) len=%d", db_pdu->ap - db_pdu->m, db_pdu->m, db_pdu->ap - db_pdu->m);
329   sha1_safe_base64(c_path+len, db_pdu->ap - db_pdu->m, db_pdu->m);
330   c_path[len+27] = 0;
331   DD("c_path(%s)", c_path);
332 
333   len = name_from_path(d_path, sizeof(d_path), "%sch/%.*s/.del/%s", zxbus_path, dest_len, db_pdu->ad.delivb.dest, c_path+len);
334   DD("d_path(%s)", d_path);
335 
336   if (!rename(c_path, d_path))
337     return 1;
338 
339   D("Retire: Renaming file(%s) to(%s) failed: %d %s. Defaulting to deleting the file altogether. Check permissions and that directories exist if you do not want deletion. For rename(2) to work, directories must be on the same filesystem. euid=%d egid=%d", c_path, d_path, errno, STRERROR(errno), geteuid(), getegid());
340 
341   if (!unlink(c_path))
342     return 2;
343 
344   ERR("Retire: Renaming file(%s) to(%s) as well as unlinking it failed: %d %s. Check permissions and that directories exist and that they are on the same filesystem. euid=%d egid=%d", c_path, d_path, errno, STRERROR(errno), geteuid(), getegid());
345   return 0;
346 }
347 
348 /*() Attempt to presist a message.
349  * Persisting involves synchronous write and an atomic filesystem rename
350  * operation, ala Maildir. The persisted message is a file that contains
351  * the entire STOMP 1.1 PDU including headers and body. Filename is the sha1
352  * hash of the contents of the file.
353  * return:: 0 on failure, 1 on success.
354  * see also:: persist feature in zxbus_listen_msg() */
355 
356 /* Called by:  stomp_got_send */
zxbus_persist(struct hi_thr * hit,struct hi_io * io,struct hi_pdu * req)357 int zxbus_persist(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req)
358 {
359   int len, dest_len;
360   char* dest;
361   char* nl;
362   char c_path[ZXID_MAX_BUF];  /* channel destination path after atomic rename */
363 
364   if (!(dest = req->ad.stomp.dest)) {
365     stomp_err(hit,io,req,"no destination - client error","SEND MUST specify destination header, i.e. channel to send to.");
366     return 0;
367   }
368   nl = memchr(dest, '\n', req->ap - dest);
369   dest_len = nl-dest;
370   DD("persist(%.*s)", dest_len, dest);
371 
372   if (!(len = zxbus_persist_msg(zxbus_cf, sizeof(c_path), c_path,
373 			       dest_len, dest, req->ap-req->m, req->m))) {
374     stomp_err(hit,io,req,"persist failure at server","Unable to persist message. Can not guarantee reliable delivery, therefore rejecting.");
375     /* *** should we make an effort to close the connection? */
376     return 0;
377   }
378   D("persisted at(%s) (%.*s) len=%d", c_path, (int)MIN(req->ap-req->ad.stomp.body, 10), req->ad.stomp.body, (int)(req->ap-req->m));
379   if (verbose) {
380     if (req->ad.stomp.receipt)
381       nl = memchr(req->ad.stomp.receipt, '\n', req->ap - req->ad.stomp.receipt);
382     else
383       nl = 0;
384     printf("FMT0 persist at %s '%.*s' len=%d rcpt(%.*s)\n", c_path, (int)MIN(req->ap-req->ad.stomp.body, 10), req->ad.stomp.body, (int)(req->ap-req->m), nl?((int)(nl-req->ad.stomp.receipt)):0, nl?req->ad.stomp.receipt:"");
385   }
386   zxbus_sched_new_delivery(hit, req, c_path+len-27, dest_len, dest);
387   return 1;
388 }
389 
390 /* EOF  --  zxbusdist.c */
391