1 /* zxbussubs.c  -  Audit Bus subscription management
2  * Copyright (c) 2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved.
3  * This is confidential unpublished proprietary source code of the author.
4  * NO WARRANTY, not even implied warranties. Contains trade secrets.
5  * Distribution prohibited unless authorized in writing. See file COPYING.
6  * Special grant: http.c may be used with zxid open source project under
7  * same licensing terms as zxid itself.
8  * $Id$
9  *
10  * 16.8.2012, created --Sampo
11  * 30.8.2012, added subscription mechanisms --Sampo
12  * 5.9.2012,  separated entity management and subscriptions to own source files --Sampo
13  */
14 
15 #include "platform.h"
16 #include "errmac.h"
17 #include "akbox.h"
18 #include "hiios.h"
19 #include "hiproto.h"
20 #include <zx/zxidconf.h>
21 #include <zx/zxidutil.h>
22 
23 #define __USE_GNU 1  /* for O_DIRECT */
24 
25 #include <ctype.h>
26 #include <memory.h>
27 #include <stdlib.h>
28 #include <netinet/in.h> /* htons(3) and friends */
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <fcntl.h>
32 #include <errno.h>
33 
34 /* Alias some struct fields for headers that can not be seen together. */
35 #define receipt   host
36 #define rcpt_id   host
37 #define acpt_vers vers
38 #define tx_id     vers
39 #define session   login
40 #define subs_id   login
41 #define subsc     login
42 #define server    pw
43 #define ack       pw
44 #define msg_id    pw
45 #define heart_bt  dest
46 #define zx_rcpt_sig dest
47 
48 extern int verbose;  /* defined in option parsing in zxbusd.c */
49 extern char* zxbus_path;
50 
51 /*() Find the channel in shf->chs array.
52  * N.B. The channel composition is fixed at boot time so no locking is needed.
53  * return:: hi_ch pointer on success, 0 on not found */
54 
55 /* Called by:  stomp_msg_deliver, zxbus_subscribe */
zxbus_find_ch(struct hiios * shf,int len,const char * dest)56 struct hi_ch* zxbus_find_ch(struct hiios* shf, int len, const char* dest)
57 {
58   int n;
59   struct hi_ch* ch;
60   if (len == -1)
61     len = strlen(dest);
62   else if (len == -2)
63     len = strchr(dest, '\n') - dest;
64   for (n = shf->max_chs, ch = shf->chs; n; --n, ++ch) {
65     if (!ch->dest)
66       break;
67     if (!memcmp(ch->dest, dest, len) && ONE_OF_2(dest[len],'\n','\0')) {
68       D("found ch(%s)", ch->dest);
69       return ch;
70     }
71   }
72   D("channel(%.*s) not found", len, dest);
73   return 0;
74 }
75 
76 /*() Write subscriptions of a channel.
77  * Called when new subscription is added at run time.
78  * Will walk the entities and subscriptions relating to that channel.
79  * locking:: shf->ent_mut must be held when calling this function
80  */
81 
82 /* Called by:  zxbus_subscribe */
zxbus_write_ch_subs(struct hiios * shf,struct hi_ch * ch)83 static int zxbus_write_ch_subs(struct hiios* shf, struct hi_ch* ch)
84 {
85 #ifndef PATH_MAX
86 #define PATH_MAX ZXID_MAX_BUF
87 #endif
88   char err_buf[PATH_MAX];
89   int ch_num = ch - shf->chs;
90   struct hi_ent* ent;
91   char buf[ZXID_MAX_BUF];
92   FILE* out;
93 
94   D("writing .subs for ch(%s) ch_num=%d", ch->dest, ch_num);
95   name_from_path(buf, sizeof(buf), "%s" ZXBUS_CH_DIR "%s/.subs", zxbus_path, ch->dest);
96   if (!(out = fopen(buf, "wb"))) {
97     perror("open");
98     ERR("writing subscriptions: File(%s) not writable errno=%d err(%s). euid=%d egid=%d cwd(%s)", buf, errno, STRERROR(errno), geteuid(), getegid(), getcwd(err_buf, sizeof(err_buf)));
99     return 0;
100   }
101 
102   for (ent = shf->ents; ent; ent = ent->n)
103     if (ent->chs[ch_num]) {
104       D("eid(%s)", ent->eid);
105       fprintf(out, "%s\n", ent->eid);
106     }
107   fclose(out);
108   return 1;
109 }
110 
111 /*() Load subscriptions of a channel. Called once at startup.
112  * N.B. The channel composition is fixed at boot time so no locking is needed. */
113 
114 /* Called by:  zxbus_load_subs */
zxbus_load_ch_subs(struct hiios * shf,struct hi_ch * ch)115 static int zxbus_load_ch_subs(struct hiios* shf, struct hi_ch* ch)
116 {
117   int ch_num = ch - shf->chs;
118   char* buf;
119   char* p;
120   char* nl;
121   struct hi_ent* ent;
122 
123   D("Loading subs for ch(%s) ch_num=%d", ch->dest, ch_num);
124   buf = p = read_all_malloc("load_ch_subs",1,0, "%s" ZXBUS_CH_DIR "%s/.subs", zxbus_path, ch->dest);
125   if (!p)
126     return 0;
127   while (nl = strchr(p, '\n')) {
128     *nl = 0;
129     if (ent = zxbus_load_ent(shf, -1, p)) {
130       ent->chs[ch_num] = HI_SUBS;
131     } else {
132       ERR("entity(%s) does not exist, in %s/.subs", p, ch->dest);
133     }
134     p = nl+1;
135   }
136   FREE(buf);
137   return 1;
138 }
139 
140 /*() Load subscriptions of all channels. Called once at startup.
141  * N.B. The channel composition is fixed at boot time so no locking is needed. */
142 
143 /* Called by:  zxbusd_main */
zxbus_load_subs(struct hiios * shf)144 int zxbus_load_subs(struct hiios* shf)
145 {
146   char path[ZXID_MAX_BUF];
147   struct dirent* de;
148   DIR* dir;
149   struct hi_ch* ch = shf->chs;
150   int n = 0;
151 
152   name_from_path(path, sizeof(path), "%s" ZXBUS_CH_DIR, zxbus_path);
153   dir = opendir(path);
154   if (!dir) {
155     perror("opendir for /var/zxid/bus/ch/ (or other if configured)");
156     D("failed path(%s)", path);
157     return 0;
158   }
159 
160   while (de = readdir(dir))
161     if (de->d_name[0] != '.' && de->d_name[strlen(de->d_name)-1] != '~') { /* ign hidden&backup */
162       if (++n > shf->max_chs) {
163 	ERR("More channels in directory(%s) than fit in array. Consider increasing -nch", path);
164 	break;
165       }
166       ch->dest = strdup(de->d_name);
167       zxbus_load_ch_subs(shf, ch++);
168     }
169   closedir(dir);
170   return 1;
171 }
172 
173 /*() Persist a subscription and book it into data structure.
174  * Returns:: 1 on success, 0 on failure. */
175 
176 /* Called by:  stomp_got_subsc */
zxbus_subscribe(struct hi_thr * hit,struct hi_io * io,struct hi_pdu * req)177 int zxbus_subscribe(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req)
178 {
179   struct hi_ch* ch;
180   struct hi_ent* ent;
181 
182   if (!req || !req->ad.stomp.dest || !*req->ad.stomp.dest) {
183     ERR("Subscription missing destination %p", req);
184     return 0;
185   }
186 
187   LOCK(io->qel.mut, "login");
188   D("LK&UNLK io(%x)->qel.mut->thr=%lx (%s:%d)", io->fd, (long)io->qel.mut.thr, io->qel.mut.func, io->qel.mut.line);
189   ent = io->ent;
190   UNLOCK(io->qel.mut, "login");
191   if (!ent) {
192     ERR("No entity associated with io_%p", io);
193     return 0;
194   }
195 
196   ch = zxbus_find_ch(hit->shf, -2, req->ad.stomp.dest);    /* Check that the channel exists. */
197   if (!ch) {
198     ERR("%s: attempted subscription to nonexistent channel(%.*s)", ent->eid, (int)(strchr(req->ad.stomp.dest, '\n') - req->ad.stomp.dest), req->ad.stomp.dest);
199     return 0;
200   }
201 
202   /* N.B. The receipt needs to be sent before registering subscription and
203    * scheduling pending deliveries, lest the simple listener clients
204    * get confused by seeing a MESSAGE when expecting RECEIPT. */
205   stomp_send_receipt(hit, io, req);
206 
207   /* Check whether entity is already subscribed. The channel arrays are
208    * in alignment so we only need to look at the corresponding slot. */
209 
210   LOCK(hit->shf->ent_mut, "subscribe");
211   D("LOCK ent_mut->thr=%lx (%s:%d)", (long)hit->shf->ent_mut.thr, hit->shf->ent_mut.func, hit->shf->ent_mut.line);
212   if (ent->chs[ch - hit->shf->chs]) {
213     ent->chs[ch - hit->shf->chs] = HI_SUBS_ON;
214     D("UNLOCK ent_mut->thr=%lx (%s:%d)", (long)hit->shf->ent_mut.thr, hit->shf->ent_mut.func, hit->shf->ent_mut.line);
215     UNLOCK(hit->shf->ent_mut, "subscribed");
216     D("Already subscribed to(%s)", ch->dest);
217   } else {
218     ent->chs[ch - hit->shf->chs] = HI_SUBS_ON;
219     zxbus_write_ch_subs(hit->shf, ch);
220     D("UNLOCK ent_mut->thr=%lx (%s:%d)", (long)hit->shf->ent_mut.thr, hit->shf->ent_mut.func, hit->shf->ent_mut.line);
221     UNLOCK(hit->shf->ent_mut, "subscribe2");
222   }
223   zxbus_sched_pending_delivery(hit, ch->dest);
224   return 1;
225 }
226 
227 /* EOF  --  zxbussubs.c */
228