1 /* zxbusdist.c - Message persist and distribution, subscription management
2 * Copyright (c) 2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved.
3 * This is confidential unpublished proprietary source code of the author.
4 * NO WARRANTY, not even implied warranties. Contains trade secrets.
5 * Distribution prohibited unless authorized in writing. See file COPYING.
6 * Special grant: http.c may be used with zxid open source project under
7 * same licensing terms as zxid itself.
8 * $Id$
9 *
10 * 16.8.2012, created --Sampo
11 * 30.8.2012, added subscription mechanisms --Sampo
12 * 5.9.2012, separated entity management and subscriptions to own source files --Sampo
13 *
14 * Subscriptions are organized by destination channel. First the
15 * channel is looked up and then list of entity objects is chased to
16 * deliver the message to each of them. If an entity is currently
17 * logged in, it will have an io object and we deliver
18 * immediately. Entities that are subscribed, but not currently logged
19 * in will cause pending delivery. In fact all entities should be
20 * considered pending until ACK has been received WRT to the message.
21 * Hence we opt to remember the ACK'd entities rather than the pending
22 * ones. (A slight side effect of this is that you can subscribe to a
23 * channel and receive messages that were generated prior to your
24 * subscription having been created.)
25 *
26 * The records about ACKs are kept in append only files under
27 * ch/DEST/.ack/SHA1.ack which has one per line the entity IDs that
28 * have ACKd (AB0 format). This file might also contain cryptographic
29 * signature proof of the ACK. In that case the first 3 characters of
30 * the line specify the version and type of line (initially "AB1 " -
31 * note the space), followed by the entity id and safe-base64 of the
32 * signature, separated by a space (n.b. the filename itself is a SHA1
33 * hash of the content of the message).
34 *
35 * When it has been determined that all entities have ACKd a message
36 * it is retired either by deleting it or by moving it to .del/
37 * directory. The latter option allows some post processing before
38 * removal - or simply can act as a convenient cache of recent
39 * messages for debugging purposes. .del/ SHOULD be cleaned
40 * periocially by a cron job.
41 *
42 * When an entity sends SUBSCRIBE command, all messages pending for
43 * the entity on that channel are sent. This will force scanning the
44 * ACK receipt files to determine if all subscribers have ACKd so that
45 * message retirement can be triggered. To avoid a full matrix of acks
46 * and nacks, we maintain counts. *** counts may be thrown off if an
47 * entity joins in middle of delivery attempt.
48 *
49 * Number of channels is expected to be relatively small, except for
50 * per user channels that are handled as a special case. The number of
51 * subscribers to common channels is expected to be extremely large,
52 * The number of subscribers for per user channel are expected to be
53 * relatively small. Thus alternative structure is to simply scan the
54 * io object array. This has the advantage of not maintaining a
55 * separate data structure that would require additional pointer
56 * fields and additional locking.
57 *
58 * Persistent store of channel subscriptions is realized by having in
59 * each channel directory a special file .subs which lists the entity
60 * IDs of the subscribers, one per line. When zxbusd starts, it loads
61 * this persisted data to memory. When new subscriptions are made, the
62 * subscription is "written through" to persistent storage and the
63 * in-memory data structure is updated as well.
64 */
65
66 #include "platform.h"
67 #include "errmac.h"
68 #include "akbox.h"
69 #include "hiios.h"
70 #include "hiproto.h"
71 #include <zx/zxidconf.h>
72 #include <zx/zxidutil.h>
73
74 #define __USE_GNU 1 /* for O_DIRECT */
75
76 #include <ctype.h>
77 #include <memory.h>
78 #include <stdlib.h>
79 #include <netinet/in.h> /* htons(3) and friends */
80 #include <sys/types.h>
81 #include <sys/stat.h>
82 #include <fcntl.h>
83 #include <errno.h>
84
85 /* Alias some struct fields for headers that can not be seen together. */
86 #define receipt host
87 #define rcpt_id host
88 #define acpt_vers vers
89 #define tx_id vers
90 #define session login
91 #define subs_id login
92 #define subsc login
93 #define server pw
94 #define ack pw
95 #define msg_id pw
96 #define heart_bt dest
97 #define zx_rcpt_sig dest
98
99 extern int verbose; /* defined in option parsing in zxbusd.c */
100 extern zxid_conf* zxbus_cf;
101 extern char* zxbus_path;
102
103 /*() Read the .ack/SHA1 file for a message and parse it into linked
104 * lists of hi_ack nodes attached to entities. The file consists
105 * of lines like
106 * AB1 eid ACK sig
107 * The pdu should be the delivery or pending bitch PDU.
108 * locking:: whill take hit->shf->ent_mut */
109
110 /* Called by: zxbus_sched_pending_delivery */
zxbus_load_acks(struct hi_thr * hit,struct hi_pdu * pdu,int fd)111 static void zxbus_load_acks(struct hi_thr* hit, struct hi_pdu* pdu, int fd)
112 {
113 struct hi_ack* ack;
114 struct hi_ent* ent;
115 char* p;
116 char* nl;
117 char* aa;
118 char* buf;
119 int gotall, len = get_file_size(fd);
120 ZMALLOCN(buf, len+1);
121 if (read_all_fd(fd, buf, len, &gotall) == -1) {
122 D("reading acks failed gotall=%d",gotall);
123 FREE(buf);
124 return;
125 }
126 buf[gotall] = 0;
127
128 LOCK(hit->shf->ent_mut, "load-acks"); // *** very big lock
129 D("LOCK ent_mut->thr=%lx (%s:%d)", (long)hit->shf->ent_mut.thr, hit->shf->ent_mut.func, hit->shf->ent_mut.line);
130 for (p = buf; p < buf+gotall; p = nl+1) {
131 if (!(nl = strchr(p, '\n')))
132 nl = buf+gotall;
133 if (!memcmp(p, "AB1 ", sizeof("AB1 ")-1)) {
134 if (aa = zx_memmem(p+sizeof("AB1 ")-1, nl-p, " ACK", sizeof(" ACK")-1)) {
135 if (ent = zxbus_load_ent(hit->shf, aa-(p+sizeof("AB1 ")-1), p+sizeof("AB1 ")-1)) {
136 ZMALLOC(ack);
137 ack->pdu = pdu;
138 ack->n = ent->acks;
139 ent->acks = ack;
140 D("Added ack pdu_%p to ent_%p eid(%s)", pdu, ent, ent->eid);
141 } else {
142 ERR("Entity of the ACK not found. line(%.*s), skipping", (int)(nl-p), p);
143 }
144 } else {
145 ERR("Not an ACK line(%.*s) in acks, skipping", (int)(nl-p), p);
146 }
147 } else {
148 ERR("Bad line(%.*s) in acks, skipping", (int)(nl-p), p);
149 }
150 }
151 D("UNLOCK ent_mut->thr=%lx (%s:%d)", (long)hit->shf->ent_mut.thr, hit->shf->ent_mut.func, hit->shf->ent_mut.line);
152 UNLOCK(hit->shf->ent_mut, "load-acks");
153 FREE(buf);
154 }
155
156 /*() Check if pdu is in the entity's already acked list.
157 * The pdu should be the delivery or pending bitch PDU.
158 * locking:: caller MUST hold shf->ent_mut
159 * return:: 1 if found (and as side effect remove), 0=not found */
160
161 /* Called by: stomp_msg_deliver */
zxbus_already_ackd(struct hi_ent * ent,struct hi_pdu * pdu)162 static int zxbus_already_ackd(struct hi_ent* ent, struct hi_pdu* pdu)
163 {
164 struct hi_ack* prev;
165 struct hi_ack* ack = ent->acks;
166 D("Checking ent_%p eid(%s) io(%x) acks_%p pdu_%p", ent, ent->eid, ent->io?ent->io->fd:0xdeadbeef, ack, pdu);
167 if (!ack)
168 return 0;
169 if (ack->pdu == pdu) {
170 D("Already ACKd by eid(%s)", ent->eid);
171 ent->acks = ack->n;
172 FREE(ack);
173 return 1;
174 }
175 for (prev = ack, ack = ack->n; ack; prev = ack, ack = ack->n)
176 if (ack->pdu == pdu) {
177 D("Already ACKd by eid(%s)", ent->eid);
178 prev->n = ack->n;
179 FREE(ack);
180 return 1;
181 }
182 return 0;
183 }
184
185 /*() Handle special "delivery bitch" PDU that represents need to
186 * send a message to listeners of a channel (aka destination).
187 * zxbus_persist() creates a synthetic PDU which is scheduled for the delivery
188 * work in todo queue. This PDU is not associated to any particular
189 * io object and will keep on rescheduling itself until its job
190 * has been done. At that point it will free itself.
191 * locking:: whill take hit->shf->ent_mut */
192
193 /* Called by: hi_shuffle */
stomp_msg_deliver(struct hi_thr * hit,struct hi_pdu * db_pdu)194 void stomp_msg_deliver(struct hi_thr* hit, struct hi_pdu* db_pdu)
195 {
196 struct hi_ent* ent;
197 struct hi_ch* ch;
198 int ch_num;
199 D("db_pdu(%p) events=0x%x", db_pdu, db_pdu->events);
200
201 ch = zxbus_find_ch(hit->shf, -2, db_pdu->ad.delivb.dest);
202 if (!ch)
203 return;
204 ch_num = ch - hit->shf->chs;
205 LOCK(hit->shf->ent_mut, "deliver"); // *** very big lock, held across I/O. Consider per ent lock
206 for (ent = hit->shf->ents; ent; ent = ent->n)
207 if (ent->chs[ch_num]) { /* entity listens on this channel? */
208 if (zxbus_already_ackd(ent, db_pdu)) {
209 DD("Already ACKd eid(%s)", ent->eid);
210 } else if (ent->io && ent->chs[ch_num] == HI_SUBS_ON) {
211 hi_sendf(hit, ent->io, db_pdu, 0,
212 "MESSAGE\nsubscription:%s\nmessage-id:%d\ndestination:%.*s\ncontent-length:%d\n\n%.*s%c",
213 "0", ent->io->ad.stomp.msgid++,
214 (strchr(db_pdu->ad.delivb.dest, '\n') - db_pdu->ad.delivb.dest), db_pdu->ad.delivb.dest,
215 db_pdu->ad.delivb.len, db_pdu->ad.delivb.len,
216 db_pdu->ad.delivb.body, 0);
217 /* the receiving half will decrement ++(int)db_pdu->ad.delivb.acks */
218 ++(db_pdu->ad.delivb.acks); /* number of ACKs pending due to MESSAGEs sent */
219 } else {
220 D("Can not deliver. entity(%s) not connected at the moment?", ent->eid);
221 ent->chs[ch_num] = HI_SUBS_PEND;
222 ++(db_pdu->ad.delivb.nacks);
223 }
224 }
225 UNLOCK(hit->shf->ent_mut, "deliver");
226 #if 0
227 if (db_pdu->ad.delivb.acks) /* still something pending? */
228 hi_todo_produce(hit, &db_pdu->qel, "deliv-bitch-again", 0);
229 else
230 hi_free_req(hit, db_pdu, "db_pdu ");
231 #else
232 /* No rescheduling. Operate in one-shot mode: all connected ones get delivery attempt.
233 * The cleanup will happen when last ACK is received and db_pdu->ad.delivb.acks
234 * count has dropped to zero. Redelivery attempts later are handled separately. */
235 #endif
236 }
237
238 /*() Schedule new delivery to happen. See stomp_msg_deliver() for what happens next.
239 * We create a synthetic PDU which is scheduled for the delivery
240 * work in todo queue. This PDU is not associated to any particular
241 * io object (and will keep on rescheduling itself until its job
242 * has been done (??? may be not)). At that point it will free itself.
243 * The acks will be written to ack_fd (to avoid 99% of the double delivery, and
244 * to have an audit trail on our side about deliveries. */
245
246 /* Called by: zxbus_persist */
zxbus_sched_new_delivery(struct hi_thr * hit,struct hi_pdu * req,const char * sha1name,int dest_len,const char * dest)247 static void zxbus_sched_new_delivery(struct hi_thr* hit, struct hi_pdu* req, const char* sha1name, int dest_len, const char* dest)
248 {
249 struct hi_pdu* pdu = hi_pdu_alloc(hit, "deliv-bitch");
250 pdu->qel.kind = HI_PDU_DIST;
251 memcpy(pdu->m, req->m, req->need); /* copy PDU substance */
252 pdu->ap += req->need;
253 pdu->ad.delivb.len = req->ad.delivb.len;
254 pdu->ad.delivb.body = pdu->m + (req->ad.stomp.body - req->m);
255 pdu->ad.delivb.dest = pdu->m + (req->ad.stomp.dest - req->m);
256
257 pdu->ad.delivb.acks = 0;
258 pdu->ad.delivb.nacks = 0;
259
260 // | O_DIRECT -- seems to give alignment problems, i.e. 22 EINVAL Invalid Argument
261 pdu->ad.delivb.ack_fd = open_fd_from_path(O_CREAT | O_WRONLY | O_APPEND | O_SYNC, 0666, "sched deliv", 1, "%s" ZXBUS_CH_DIR "%.*s/.ack/%s", zxbus_path, dest_len, dest, sha1name);
262 hi_todo_produce(hit, &pdu->qel, "deliv-bitch", 0);
263 }
264
265 /*() Scan messages in channel directory and schedule pending ones for delivery.
266 * We avoid delivering to listeners that have already received the PDU
267 * by reading in the .ack/SHA1 file and attaching to pending bitch PDU
268 * a linked list of already successful entities. Linked list because
269 * it is simplest, but a hash table could be more effective. */
270
271 /* Called by: zxbus_subscribe */
zxbus_sched_pending_delivery(struct hi_thr * hit,const char * dest)272 void zxbus_sched_pending_delivery(struct hi_thr* hit, const char* dest)
273 {
274 char path[ZXID_MAX_BUF];
275 struct dirent* de;
276 DIR* dir;
277 struct hi_pdu* pdu;
278
279 name_from_path(path, sizeof(path), "%s" ZXBUS_CH_DIR "%s", zxbus_path, dest);
280 dir = opendir(path);
281 if (!dir) {
282 perror("opendir for /var/zxid/bus/ch/DEST (or other if configured)");
283 D("failed path(%s) dest(%s)", path, dest);
284 return;
285 }
286
287 while (de = readdir(dir)) /* iterate over messages in the channel directory */
288 if (de->d_name[0] != '.' && de->d_name[strlen(de->d_name)-1] != '~') { /* ign hidden&backup */
289 if (!(pdu = hi_pdu_alloc(hit, "pend-bitch")))
290 break;
291 pdu->qel.kind = HI_PDU_DIST;
292 pdu->ap += read_all(pdu->lim - pdu->ap, pdu->ap, "pend-bitch", 1,
293 "%s" ZXBUS_CH_DIR "%s/%s", zxbus_path, dest, de->d_name);
294
295 if (stomp_parse_pdu(pdu))
296 continue; /* parse error in PDU */
297
298 pdu->ad.delivb.acks = 0;
299 pdu->ad.delivb.nacks = 0;
300
301 // | O_DIRECT -- seems to give alignment problems, i.e. 22 EINVAL Invalid Argument
302 pdu->ad.delivb.ack_fd = open_fd_from_path(O_CREAT | O_RDWR | O_APPEND | O_SYNC, 0666, "pend", 1, "%s" ZXBUS_CH_DIR "%s/.ack/%s", zxbus_path, dest, de->d_name);
303 zxbus_load_acks(hit, pdu, pdu->ad.delivb.ack_fd);
304 hi_todo_produce(hit, &pdu->qel, "pend-bitch", 0);
305 }
306 closedir(dir);
307 }
308
309 /*() Retire fully delivered message.
310 * The message is moved to .del/ for later removal if it exists,
311 * or just unlinked on the spot.
312 * return:: 0 on fail, 1 on rename to .del, 2 on unlink */
313
314 /* Called by: stomp_got_ack */
zxbus_retire(struct hi_thr * hit,struct hi_pdu * db_pdu)315 int zxbus_retire(struct hi_thr* hit, struct hi_pdu* db_pdu)
316 {
317 int len, dest_len;
318 char c_path[ZXID_MAX_BUF]; /* current channel path */
319 char d_path[ZXID_MAX_BUF]; /* .del path after atomic rename */
320
321 dest_len = strchr(db_pdu->ad.delivb.dest, '\n')-db_pdu->ad.delivb.dest;
322 len = name_from_path(c_path, sizeof(c_path), "%sch/%.*s/", zxbus_path, dest_len, db_pdu->ad.delivb.dest);
323 if (sizeof(c_path)-len < 28+5 /* +5 accounts for d_path having 5 more chars (.del/) */) {
324 ERR("The c_path for retiring exceeds limit. len=%d", len);
325 return 0;
326 }
327 DD("c_path(%s) len=%d", c_path, len);
328 DD("sha1_input(%.*s) len=%d", db_pdu->ap - db_pdu->m, db_pdu->m, db_pdu->ap - db_pdu->m);
329 sha1_safe_base64(c_path+len, db_pdu->ap - db_pdu->m, db_pdu->m);
330 c_path[len+27] = 0;
331 DD("c_path(%s)", c_path);
332
333 len = name_from_path(d_path, sizeof(d_path), "%sch/%.*s/.del/%s", zxbus_path, dest_len, db_pdu->ad.delivb.dest, c_path+len);
334 DD("d_path(%s)", d_path);
335
336 if (!rename(c_path, d_path))
337 return 1;
338
339 D("Retire: Renaming file(%s) to(%s) failed: %d %s. Defaulting to deleting the file altogether. Check permissions and that directories exist if you do not want deletion. For rename(2) to work, directories must be on the same filesystem. euid=%d egid=%d", c_path, d_path, errno, STRERROR(errno), geteuid(), getegid());
340
341 if (!unlink(c_path))
342 return 2;
343
344 ERR("Retire: Renaming file(%s) to(%s) as well as unlinking it failed: %d %s. Check permissions and that directories exist and that they are on the same filesystem. euid=%d egid=%d", c_path, d_path, errno, STRERROR(errno), geteuid(), getegid());
345 return 0;
346 }
347
348 /*() Attempt to presist a message.
349 * Persisting involves synchronous write and an atomic filesystem rename
350 * operation, ala Maildir. The persisted message is a file that contains
351 * the entire STOMP 1.1 PDU including headers and body. Filename is the sha1
352 * hash of the contents of the file.
353 * return:: 0 on failure, 1 on success.
354 * see also:: persist feature in zxbus_listen_msg() */
355
356 /* Called by: stomp_got_send */
zxbus_persist(struct hi_thr * hit,struct hi_io * io,struct hi_pdu * req)357 int zxbus_persist(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req)
358 {
359 int len, dest_len;
360 char* dest;
361 char* nl;
362 char c_path[ZXID_MAX_BUF]; /* channel destination path after atomic rename */
363
364 if (!(dest = req->ad.stomp.dest)) {
365 stomp_err(hit,io,req,"no destination - client error","SEND MUST specify destination header, i.e. channel to send to.");
366 return 0;
367 }
368 nl = memchr(dest, '\n', req->ap - dest);
369 dest_len = nl-dest;
370 DD("persist(%.*s)", dest_len, dest);
371
372 if (!(len = zxbus_persist_msg(zxbus_cf, sizeof(c_path), c_path,
373 dest_len, dest, req->ap-req->m, req->m))) {
374 stomp_err(hit,io,req,"persist failure at server","Unable to persist message. Can not guarantee reliable delivery, therefore rejecting.");
375 /* *** should we make an effort to close the connection? */
376 return 0;
377 }
378 D("persisted at(%s) (%.*s) len=%d", c_path, (int)MIN(req->ap-req->ad.stomp.body, 10), req->ad.stomp.body, (int)(req->ap-req->m));
379 if (verbose) {
380 if (req->ad.stomp.receipt)
381 nl = memchr(req->ad.stomp.receipt, '\n', req->ap - req->ad.stomp.receipt);
382 else
383 nl = 0;
384 printf("FMT0 persist at %s '%.*s' len=%d rcpt(%.*s)\n", c_path, (int)MIN(req->ap-req->ad.stomp.body, 10), req->ad.stomp.body, (int)(req->ap-req->m), nl?((int)(nl-req->ad.stomp.receipt)):0, nl?req->ad.stomp.receipt:"");
385 }
386 zxbus_sched_new_delivery(hit, req, c_path+len-27, dest_len, dest);
387 return 1;
388 }
389
390 /* EOF -- zxbusdist.c */
391