1 /*
2 * jabberd - Jabber Open Source Server
3 * Copyright (c) 2002 Jeremie Miller, Thomas Muldowney,
4 * Ryan Eatmon, Robert Norris
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA02111-1307USA
19 */
20
21 #include "sm.h"
22
23 /** @file sm/sm.c
24 * @brief stream / io callbacks
25 * @author Robert Norris
26 * $Date: 2005/08/17 07:48:28 $
27 * $Revision: 1.51 $
28 */
29
30 sig_atomic_t sm_lost_router = 0;
31
32 /** our master callback */
sm_sx_callback(sx_t s,sx_event_t e,void * data,void * arg)33 int sm_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) {
34 sm_t sm = (sm_t) arg;
35 sx_buf_t buf = (sx_buf_t) data;
36 sx_error_t *sxe;
37 nad_t nad;
38 pkt_t pkt;
39 int len, ns, elem, attr;
40 char *domain;
41
42 switch(e) {
43 case event_WANT_READ:
44 log_debug(ZONE, "want read");
45 mio_read(sm->mio, sm->fd);
46 break;
47
48 case event_WANT_WRITE:
49 log_debug(ZONE, "want write");
50 mio_write(sm->mio, sm->fd);
51 break;
52
53 case event_READ:
54 log_debug(ZONE, "reading from %d", sm->fd->fd);
55
56 /* do the read */
57 len = recv(sm->fd->fd, buf->data, buf->len, 0);
58
59 if (len < 0) {
60 if (MIO_WOULDBLOCK) {
61 buf->len = 0;
62 return 0;
63 }
64
65 log_write(sm->log, LOG_NOTICE, "[%d] [router] read error: %s (%d)", sm->fd->fd, MIO_STRERROR(MIO_ERROR), MIO_ERROR);
66
67 sx_kill(s);
68
69 return -1;
70 }
71
72 else if (len == 0) {
73 /* they went away */
74 sx_kill(s);
75
76 return -1;
77 }
78
79 log_debug(ZONE, "read %d bytes", len);
80
81 buf->len = len;
82
83 return len;
84
85 case event_WRITE:
86 log_debug(ZONE, "writing to %d", sm->fd->fd);
87
88 len = send(sm->fd->fd, buf->data, buf->len, 0);
89 if (len >= 0) {
90 log_debug(ZONE, "%d bytes written", len);
91 return len;
92 }
93
94 if (MIO_WOULDBLOCK)
95 return 0;
96
97 log_write(sm->log, LOG_NOTICE, "[%d] [router] write error: %s (%d)", sm->fd->fd, MIO_STRERROR(MIO_ERROR), MIO_ERROR);
98
99 sx_kill(s);
100
101 return -1;
102
103 case event_ERROR:
104 sxe = (sx_error_t *) data;
105 log_write(sm->log, LOG_NOTICE, "error from router: %s (%s)", sxe->generic, sxe->specific);
106
107 if(sxe->code == SX_ERR_AUTH)
108 sx_close(s);
109
110 break;
111
112 case event_STREAM:
113 break;
114
115 case event_OPEN:
116 log_write(sm->log, LOG_NOTICE, "connection to router established");
117
118 /* set connection attempts counter */
119 sm->retry_left = sm->retry_lost;
120
121 nad = nad_new();
122 ns = nad_add_namespace(nad, uri_COMPONENT, NULL);
123 nad_append_elem(nad, ns, "bind", 0);
124 nad_append_attr(nad, -1, "name", sm->id);
125 log_debug(ZONE, "requesting component bind for '%s'", sm->id);
126 sx_nad_write(sm->router, nad);
127
128 if(xhash_iter_first(sm->hosts))
129 do {
130 xhash_iter_get(sm->hosts, (void *) &domain, &len, NULL);
131
132 /* skip already requested SM id */
133 if (strlen(sm->id) == len && strncmp(sm->id, domain, len) == 0)
134 continue;
135
136 nad = nad_new();
137 ns = nad_add_namespace(nad, uri_COMPONENT, NULL);
138 elem = nad_append_elem(nad, ns, "bind", 0);
139 nad_set_attr(nad, elem, -1, "name", domain, len);
140 nad_append_attr(nad, -1, "multi", "to");
141 log_debug(ZONE, "requesting domain bind for '%.*s'", len, domain);
142 sx_nad_write(sm->router, nad);
143
144 } while(xhash_iter_next(sm->hosts));
145 break;
146
147 case event_PACKET:
148 nad = (nad_t) data;
149
150 /* drop unqualified packets */
151 if (NAD_ENS(nad, 0) < 0) {
152 nad_free(nad);
153 return 0;
154 }
155 /* watch for the features packet */
156 if (s->state == state_STREAM) {
157 if (NAD_NURI_L(nad, NAD_ENS(nad, 0)) != strlen(uri_STREAMS)
158 || strncmp(uri_STREAMS, NAD_NURI(nad, NAD_ENS(nad, 0)), strlen(uri_STREAMS)) != 0
159 || NAD_ENAME_L(nad, 0) != 8 || strncmp("features", NAD_ENAME(nad, 0), 8) != 0) {
160 log_debug(ZONE, "got a non-features packet on an unauth'd stream, dropping");
161 nad_free(nad);
162 return 0;
163 }
164
165 #ifdef HAVE_SSL
166 /* starttls if we can */
167 if (sm->sx_ssl != NULL && s->ssf == 0) {
168 ns = nad_find_scoped_namespace(nad, uri_TLS, NULL);
169 if (ns >= 0) {
170 elem = nad_find_elem(nad, 0, ns, "starttls", 1);
171 if (elem >= 0) {
172 if (sx_ssl_client_starttls(sm->sx_ssl, s, NULL, NULL) == 0) {
173 nad_free(nad);
174 return 0;
175 }
176 log_write(sm->log, LOG_NOTICE, "unable to establish encrypted session with router");
177 }
178 }
179 }
180 #endif
181
182 /* !!! pull the list of mechanisms, and choose the best one.
183 * if there isn't an appropriate one, error and bail */
184
185 /* authenticate */
186 sx_sasl_auth(sm->sx_sasl, s, "jabberd-router", "DIGEST-MD5", sm->router_user, sm->router_pass);
187
188 nad_free(nad);
189 return 0;
190 }
191
192 /* watch for the bind response */
193 if (s->state == state_OPEN && !sm->online) {
194 if (NAD_NURI_L(nad, NAD_ENS(nad, 0)) != strlen(uri_COMPONENT)
195 || strncmp(uri_COMPONENT, NAD_NURI(nad, NAD_ENS(nad, 0)), strlen(uri_COMPONENT)) != 0
196 || NAD_ENAME_L(nad, 0) != 4 || strncmp("bind", NAD_ENAME(nad, 0), 4)) {
197 log_debug(ZONE, "got a packet from router, but we're not online, dropping");
198 nad_free(nad);
199 return 0;
200 }
201
202 /* catch errors */
203 attr = nad_find_attr(nad, 0, -1, "error", NULL);
204 if(attr >= 0) {
205 log_write(sm->log, LOG_NOTICE, "router refused bind request (%.*s)", NAD_AVAL_L(nad, attr), NAD_AVAL(nad, attr));
206 exit(1);
207 }
208
209 log_debug(ZONE, "coming online");
210
211 /* we're online */
212 sm->online = sm->started = 1;
213 log_write(sm->log, LOG_NOTICE, "%s ready for sessions", sm->id);
214
215 nad_free(nad);
216 return 0;
217 }
218
219 log_debug(ZONE, "got a packet");
220
221 pkt = pkt_new(sm, nad);
222 if (pkt == NULL) {
223 log_debug(ZONE, "invalid packet, dropping");
224 return 0;
225 }
226
227 /* go */
228 dispatch(sm, pkt);
229
230 return 0;
231
232 case event_CLOSED:
233 mio_close(sm->mio, sm->fd);
234 sm->fd = NULL;
235 return -1;
236 }
237
238 return 0;
239 }
240
sm_mio_callback(mio_t m,mio_action_t a,mio_fd_t fd,void * data,void * arg)241 int sm_mio_callback(mio_t m, mio_action_t a, mio_fd_t fd, void *data, void *arg) {
242 sm_t sm = (sm_t) arg;
243 int nbytes;
244
245 switch (a) {
246 case action_READ:
247 log_debug(ZONE, "read action on fd %d", fd->fd);
248
249 ioctl(fd->fd, FIONREAD, &nbytes);
250 if(nbytes == 0) {
251 sx_kill(sm->router);
252 return 0;
253 }
254
255 return sx_can_read(sm->router);
256
257 case action_WRITE:
258 log_debug(ZONE, "write action on fd %d", fd->fd);
259 return sx_can_write(sm->router);
260
261 case action_CLOSE:
262 log_debug(ZONE, "close action on fd %d", fd->fd);
263 log_write(sm->log, LOG_NOTICE, "connection to router closed");
264
265 sm_lost_router = 1;
266
267 /* we're offline */
268 sm->online = 0;
269
270 break;
271
272 case action_ACCEPT:
273 break;
274 }
275
276 return 0;
277 }
278
279 /** send a new action route */
sm_c2s_action(sess_t dest,const char * action,const char * target)280 void sm_c2s_action(sess_t dest, const char *action, const char *target) {
281 nad_t nad;
282 int rns, sns;
283
284 nad = nad_new();
285
286 rns = nad_add_namespace(nad, uri_COMPONENT, NULL);
287 nad_append_elem(nad, rns, "route", 0);
288
289 nad_append_attr(nad, -1, "to", dest->c2s);
290 nad_append_attr(nad, -1, "from", dest->user->sm->id);
291
292 sns = nad_add_namespace(nad, uri_SESSION, "sc");
293 nad_append_elem(nad, sns, "session", 1);
294
295 if (dest->c2s_id[0] != '\0')
296 nad_append_attr(nad, sns, "c2s", dest->c2s_id);
297 if (dest->sm_id[0] != '\0')
298 nad_append_attr(nad, sns, "sm", dest->sm_id);
299
300 nad_append_attr(nad, -1, "action", action);
301 if (target != NULL)
302 nad_append_attr(nad, -1, "target", target);
303
304 log_debug(ZONE,
305 "routing nad to %s from %s c2s %s s2s %s action %s target %s",
306 dest->c2s, dest->user->sm->id, dest->c2s_id, dest->sm_id,
307 action, target);
308
309 sx_nad_write(dest->user->sm->router, nad);
310 }
311
312 /** this is gratuitous, but apache gets one, so why not? */
sm_signature(sm_t sm,const char * str)313 void sm_signature(sm_t sm, const char *str) {
314 if (sm->siglen == 0) {
315 snprintf(&sm->signature[sm->siglen], 2048 - sm->siglen, "%s", str);
316 sm->siglen += strlen(str);
317 } else {
318 snprintf(&sm->signature[sm->siglen], 2048 - sm->siglen, " %s", str);
319 sm->siglen += strlen(str) + 1;
320 }
321 }
322
323 /** register a new global ns */
sm_register_ns(sm_t sm,const char * uri)324 int sm_register_ns(sm_t sm, const char *uri) {
325 int ns_idx;
326
327 ns_idx = (int) (long) xhash_get(sm->xmlns, uri);
328 if (ns_idx == 0) {
329 ns_idx = xhash_count(sm->xmlns) + 2;
330 xhash_put(sm->xmlns, pstrdup(xhash_pool(sm->xmlns), uri), (void *) (long) ns_idx);
331 }
332 xhash_put(sm->xmlns_refcount, uri, (void *) ((long) xhash_get(sm->xmlns_refcount, uri) + 1));
333
334 return ns_idx;
335 }
336
337 /** unregister a global ns */
sm_unregister_ns(sm_t sm,const char * uri)338 void sm_unregister_ns(sm_t sm, const char *uri) {
339 int refcount = (int) (long) xhash_get(sm->xmlns_refcount, uri);
340 if (refcount == 1) {
341 xhash_zap(sm->xmlns, uri);
342 xhash_zap(sm->xmlns_refcount, uri);
343 } else if (refcount > 1) {
344 xhash_put(sm->xmlns_refcount, uri, (void *) ((long) xhash_get(sm->xmlns_refcount, uri) - 1));
345 }
346 }
347
348 /** get a globally registered ns */
sm_get_ns(sm_t sm,const char * uri)349 int sm_get_ns(sm_t sm, const char *uri) {
350 return (int) (long) xhash_get(sm->xmlns, uri);
351 }
352
353 // Rate limit check: Prevent denial-of-service due to excessive database queries
354 // Make sure owner is responsible for the query!
sm_storage_rate_limit(sm_t sm,const char * owner)355 int sm_storage_rate_limit(sm_t sm, const char *owner) {
356 rate_t rt;
357 user_t user;
358 sess_t sess;
359 item_t item;
360
361 if (sm->query_rate_total == 0 || owner == NULL)
362 return FALSE;
363
364 user = xhash_get(sm->users, owner);
365 if (user != NULL) {
366 rt = (rate_t) xhash_get(sm->query_rates, owner);
367 if (rt == NULL) {
368 rt = rate_new(sm->query_rate_total, sm->query_rate_seconds, sm->query_rate_wait);
369 xhash_put(sm->query_rates, pstrdup(xhash_pool(sm->query_rates), owner), (void *) rt);
370 pool_cleanup(xhash_pool(sm->query_rates), (void (*)(void *)) rate_free, rt);
371 }
372
373 if(rate_check(rt) == 0) {
374 log_write(sm->log, LOG_WARNING, "[%s] is being disconnected, too many database queries within %d seconds", owner, sm->query_rate_seconds);
375 user = xhash_get(sm->users, owner);
376 for (sess = user->sessions; sess != NULL; sess = sess->next) {
377 sm_c2s_action(sess, "ended", NULL);
378 }
379 if(xhash_iter_first(user->roster))
380 do {
381 xhash_iter_get(user->roster, NULL, NULL, (void *) &item);
382 if(item->to) {
383 pkt_router(pkt_create(user->sm, "presence", "unavailable", jid_full(item->jid), jid_full(user->jid)));
384 }
385 } while(xhash_iter_next(user->roster));
386 return TRUE;
387 } else {
388 rate_add(rt, 1);
389 }
390 } else {
391 log_debug(ZONE, "Error: could not get user data for %s", owner);
392 }
393 return FALSE;
394 }
395