1 /* zxbussubs.c - Audit Bus subscription management
2 * Copyright (c) 2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved.
3 * This is confidential unpublished proprietary source code of the author.
4 * NO WARRANTY, not even implied warranties. Contains trade secrets.
5 * Distribution prohibited unless authorized in writing. See file COPYING.
6 * Special grant: http.c may be used with zxid open source project under
7 * same licensing terms as zxid itself.
8 * $Id$
9 *
10 * 16.8.2012, created --Sampo
11 * 30.8.2012, added subscription mechanisms --Sampo
12 * 5.9.2012, separated entity management and subscriptions to own source files --Sampo
13 */
14
15 #include "platform.h"
16 #include "errmac.h"
17 #include "akbox.h"
18 #include "hiios.h"
19 #include "hiproto.h"
20 #include <zx/zxidconf.h>
21 #include <zx/zxidutil.h>
22
23 #define __USE_GNU 1 /* for O_DIRECT */
24
25 #include <ctype.h>
26 #include <memory.h>
27 #include <stdlib.h>
28 #include <netinet/in.h> /* htons(3) and friends */
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <fcntl.h>
32 #include <errno.h>
33
34 /* Alias some struct fields for headers that can not be seen together. */
35 #define receipt host
36 #define rcpt_id host
37 #define acpt_vers vers
38 #define tx_id vers
39 #define session login
40 #define subs_id login
41 #define subsc login
42 #define server pw
43 #define ack pw
44 #define msg_id pw
45 #define heart_bt dest
46 #define zx_rcpt_sig dest
47
48 extern int verbose; /* defined in option parsing in zxbusd.c */
49 extern char* zxbus_path;
50
51 /*() Find the channel in shf->chs array.
52 * N.B. The channel composition is fixed at boot time so no locking is needed.
53 * return:: hi_ch pointer on success, 0 on not found */
54
55 /* Called by: stomp_msg_deliver, zxbus_subscribe */
zxbus_find_ch(struct hiios * shf,int len,const char * dest)56 struct hi_ch* zxbus_find_ch(struct hiios* shf, int len, const char* dest)
57 {
58 int n;
59 struct hi_ch* ch;
60 if (len == -1)
61 len = strlen(dest);
62 else if (len == -2)
63 len = strchr(dest, '\n') - dest;
64 for (n = shf->max_chs, ch = shf->chs; n; --n, ++ch) {
65 if (!ch->dest)
66 break;
67 if (!memcmp(ch->dest, dest, len) && ONE_OF_2(dest[len],'\n','\0')) {
68 D("found ch(%s)", ch->dest);
69 return ch;
70 }
71 }
72 D("channel(%.*s) not found", len, dest);
73 return 0;
74 }
75
76 /*() Write subscriptions of a channel.
77 * Called when new subscription is added at run time.
78 * Will walk the entities and subscriptions relating to that channel.
79 * locking:: shf->ent_mut must be held when calling this function
80 */
81
82 /* Called by: zxbus_subscribe */
zxbus_write_ch_subs(struct hiios * shf,struct hi_ch * ch)83 static int zxbus_write_ch_subs(struct hiios* shf, struct hi_ch* ch)
84 {
85 #ifndef PATH_MAX
86 #define PATH_MAX ZXID_MAX_BUF
87 #endif
88 char err_buf[PATH_MAX];
89 int ch_num = ch - shf->chs;
90 struct hi_ent* ent;
91 char buf[ZXID_MAX_BUF];
92 FILE* out;
93
94 D("writing .subs for ch(%s) ch_num=%d", ch->dest, ch_num);
95 name_from_path(buf, sizeof(buf), "%s" ZXBUS_CH_DIR "%s/.subs", zxbus_path, ch->dest);
96 if (!(out = fopen(buf, "wb"))) {
97 perror("open");
98 ERR("writing subscriptions: File(%s) not writable errno=%d err(%s). euid=%d egid=%d cwd(%s)", buf, errno, STRERROR(errno), geteuid(), getegid(), getcwd(err_buf, sizeof(err_buf)));
99 return 0;
100 }
101
102 for (ent = shf->ents; ent; ent = ent->n)
103 if (ent->chs[ch_num]) {
104 D("eid(%s)", ent->eid);
105 fprintf(out, "%s\n", ent->eid);
106 }
107 fclose(out);
108 return 1;
109 }
110
111 /*() Load subscriptions of a channel. Called once at startup.
112 * N.B. The channel composition is fixed at boot time so no locking is needed. */
113
114 /* Called by: zxbus_load_subs */
zxbus_load_ch_subs(struct hiios * shf,struct hi_ch * ch)115 static int zxbus_load_ch_subs(struct hiios* shf, struct hi_ch* ch)
116 {
117 int ch_num = ch - shf->chs;
118 char* buf;
119 char* p;
120 char* nl;
121 struct hi_ent* ent;
122
123 D("Loading subs for ch(%s) ch_num=%d", ch->dest, ch_num);
124 buf = p = read_all_malloc("load_ch_subs",1,0, "%s" ZXBUS_CH_DIR "%s/.subs", zxbus_path, ch->dest);
125 if (!p)
126 return 0;
127 while (nl = strchr(p, '\n')) {
128 *nl = 0;
129 if (ent = zxbus_load_ent(shf, -1, p)) {
130 ent->chs[ch_num] = HI_SUBS;
131 } else {
132 ERR("entity(%s) does not exist, in %s/.subs", p, ch->dest);
133 }
134 p = nl+1;
135 }
136 FREE(buf);
137 return 1;
138 }
139
140 /*() Load subscriptions of all channels. Called once at startup.
141 * N.B. The channel composition is fixed at boot time so no locking is needed. */
142
143 /* Called by: zxbusd_main */
zxbus_load_subs(struct hiios * shf)144 int zxbus_load_subs(struct hiios* shf)
145 {
146 char path[ZXID_MAX_BUF];
147 struct dirent* de;
148 DIR* dir;
149 struct hi_ch* ch = shf->chs;
150 int n = 0;
151
152 name_from_path(path, sizeof(path), "%s" ZXBUS_CH_DIR, zxbus_path);
153 dir = opendir(path);
154 if (!dir) {
155 perror("opendir for /var/zxid/bus/ch/ (or other if configured)");
156 D("failed path(%s)", path);
157 return 0;
158 }
159
160 while (de = readdir(dir))
161 if (de->d_name[0] != '.' && de->d_name[strlen(de->d_name)-1] != '~') { /* ign hidden&backup */
162 if (++n > shf->max_chs) {
163 ERR("More channels in directory(%s) than fit in array. Consider increasing -nch", path);
164 break;
165 }
166 ch->dest = strdup(de->d_name);
167 zxbus_load_ch_subs(shf, ch++);
168 }
169 closedir(dir);
170 return 1;
171 }
172
173 /*() Persist a subscription and book it into data structure.
174 * Returns:: 1 on success, 0 on failure. */
175
176 /* Called by: stomp_got_subsc */
zxbus_subscribe(struct hi_thr * hit,struct hi_io * io,struct hi_pdu * req)177 int zxbus_subscribe(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req)
178 {
179 struct hi_ch* ch;
180 struct hi_ent* ent;
181
182 if (!req || !req->ad.stomp.dest || !*req->ad.stomp.dest) {
183 ERR("Subscription missing destination %p", req);
184 return 0;
185 }
186
187 LOCK(io->qel.mut, "login");
188 D("LK&UNLK io(%x)->qel.mut->thr=%lx (%s:%d)", io->fd, (long)io->qel.mut.thr, io->qel.mut.func, io->qel.mut.line);
189 ent = io->ent;
190 UNLOCK(io->qel.mut, "login");
191 if (!ent) {
192 ERR("No entity associated with io_%p", io);
193 return 0;
194 }
195
196 ch = zxbus_find_ch(hit->shf, -2, req->ad.stomp.dest); /* Check that the channel exists. */
197 if (!ch) {
198 ERR("%s: attempted subscription to nonexistent channel(%.*s)", ent->eid, (int)(strchr(req->ad.stomp.dest, '\n') - req->ad.stomp.dest), req->ad.stomp.dest);
199 return 0;
200 }
201
202 /* N.B. The receipt needs to be sent before registering subscription and
203 * scheduling pending deliveries, lest the simple listener clients
204 * get confused by seeing a MESSAGE when expecting RECEIPT. */
205 stomp_send_receipt(hit, io, req);
206
207 /* Check whether entity is already subscribed. The channel arrays are
208 * in alignment so we only need to look at the corresponding slot. */
209
210 LOCK(hit->shf->ent_mut, "subscribe");
211 D("LOCK ent_mut->thr=%lx (%s:%d)", (long)hit->shf->ent_mut.thr, hit->shf->ent_mut.func, hit->shf->ent_mut.line);
212 if (ent->chs[ch - hit->shf->chs]) {
213 ent->chs[ch - hit->shf->chs] = HI_SUBS_ON;
214 D("UNLOCK ent_mut->thr=%lx (%s:%d)", (long)hit->shf->ent_mut.thr, hit->shf->ent_mut.func, hit->shf->ent_mut.line);
215 UNLOCK(hit->shf->ent_mut, "subscribed");
216 D("Already subscribed to(%s)", ch->dest);
217 } else {
218 ent->chs[ch - hit->shf->chs] = HI_SUBS_ON;
219 zxbus_write_ch_subs(hit->shf, ch);
220 D("UNLOCK ent_mut->thr=%lx (%s:%d)", (long)hit->shf->ent_mut.thr, hit->shf->ent_mut.func, hit->shf->ent_mut.line);
221 UNLOCK(hit->shf->ent_mut, "subscribe2");
222 }
223 zxbus_sched_pending_delivery(hit, ch->dest);
224 return 1;
225 }
226
227 /* EOF -- zxbussubs.c */
228