1 /*
2 * jabberd - Jabber Open Source Server
3 * Copyright (c) 2002 Jeremie Miller, Thomas Muldowney,
4 * Ryan Eatmon, Robert Norris
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA02111-1307USA
19 */
20
21 #include "sx.h"
22
23 /** handler for read data */
_sx_process_read(sx_t s,sx_buf_t buf)24 void _sx_process_read(sx_t s, sx_buf_t buf) {
25 sx_error_t sxe;
26 nad_t nad;
27 char *errstring;
28 int i;
29 int ns, elem;
30
31 /* Note that buf->len can validly be 0 here, if we got data from
32 the socket but the plugin didn't return anything to us (e.g. a
33 SSL packet was split across a tcp segment boundary) */
34
35 /* count bytes parsed */
36 s->pbytes += buf->len;
37
38 /* parse it */
39 if(XML_Parse(s->expat, buf->data, buf->len, 0) == 0) {
40 /* only report error we haven't already */
41 if(!s->fail) {
42 /* parse error */
43 errstring = (char *) XML_ErrorString(XML_GetErrorCode(s->expat));
44
45 _sx_debug(ZONE, "XML parse error: %s, character %d: %.*s",
46 errstring, XML_GetCurrentByteIndex(s->expat) - s->tbytes, buf->len, buf->data);
47 _sx_gen_error(sxe, SX_ERR_XML_PARSE, "XML parse error", errstring);
48 _sx_event(s, event_ERROR, (void *) &sxe);
49
50 _sx_error(s, stream_err_XML_NOT_WELL_FORMED, errstring);
51 _sx_close(s);
52
53 _sx_buffer_free(buf);
54
55 return;
56 }
57
58 /* !!! is this the right thing to do? we should probably set
59 * s->fail and let the code further down handle it. */
60 _sx_buffer_free(buf);
61
62 return;
63 }
64
65 /* check if the stanza size limit is exceeded (it wasn't reset by parser) */
66 if(s->rbytesmax && s->pbytes > s->rbytesmax) {
67 /* parse error */
68 _sx_debug(ZONE, "maximum stanza size (%d) exceeded by reading %d bytes", s->rbytesmax, s->pbytes);
69
70 errstring = (char *) XML_ErrorString(XML_GetErrorCode(s->expat));
71
72 _sx_gen_error(sxe, SX_ERR_XML_PARSE, "stream read error", "Maximum stanza size exceeded");
73 _sx_event(s, event_ERROR, (void *) &sxe);
74
75 _sx_error(s, stream_err_POLICY_VIOLATION, errstring);
76 _sx_close(s);
77
78 _sx_buffer_free(buf);
79
80 return;
81 }
82
83 /* count bytes processed */
84 s->tbytes += buf->len;
85
86 /* done with the buffer */
87 _sx_buffer_free(buf);
88
89 /* process completed nads */
90 if(s->state >= state_STREAM)
91 while((nad = jqueue_pull(s->rnadq)) != NULL) {
92 int plugin_error;
93 #ifdef SX_DEBUG
94 const char *out; int len;
95 nad_print(nad, 0, &out, &len);
96 _sx_debug(ZONE, "completed nad: %.*s", len, out);
97 #endif
98
99 /* check for errors */
100 if(NAD_ENS(nad, 0) >= 0 && NAD_NURI_L(nad, NAD_ENS(nad, 0)) == strlen(uri_STREAMS) && strncmp(NAD_NURI(nad, NAD_ENS(nad, 0)), uri_STREAMS, strlen(uri_STREAMS)) == 0 && NAD_ENAME_L(nad, 0) == 5 && strncmp(NAD_ENAME(nad, 0), "error", 5) == 0) {
101
102 errstring = NULL;
103
104 /* get text error description if available - XMPP 4.7.2 */
105 if((ns = nad_find_scoped_namespace(nad, uri_STREAM_ERR, NULL)) >= 0)
106 if((elem = nad_find_elem(nad, 0, ns, "text", 1)) >= 0)
107 if(NAD_CDATA_L(nad, elem) > 0) {
108 errstring = (char *) malloc(sizeof(char) * (NAD_CDATA_L(nad, elem) + 1));
109 sprintf(errstring, "%.*s", NAD_CDATA_L(nad, elem), NAD_CDATA(nad, elem));
110 }
111
112 /* if not available, look for legacy error text as in <stream:error>description</stream:error> */
113 if (errstring == NULL && NAD_CDATA_L(nad, 0) > 0) {
114 errstring = (char *) malloc(sizeof(char) * (NAD_CDATA_L(nad, 0) + 1));
115 sprintf(errstring, "%.*s", NAD_CDATA_L(nad, 0), NAD_CDATA(nad, 0));
116 }
117
118 /* if not available, log the whole packet for debugging */
119 if (errstring == NULL) {
120 const char *xml;
121 int xlen;
122
123 nad_print(nad, 0, &xml, &xlen);
124 errstring = (char *) malloc(sizeof(char) * (xlen + 1));
125 sprintf(errstring, "%.*s", xlen, xml);
126 }
127
128 if(s->state < state_CLOSING) {
129 _sx_gen_error(sxe, SX_ERR_STREAM, "Stream error", errstring);
130 _sx_event(s, event_ERROR, (void *) &sxe);
131 _sx_state(s, state_CLOSING);
132 }
133
134 free(errstring);
135
136 nad_free(nad);
137
138 break;
139 }
140
141 /* check for close */
142 if ((s->flags & SX_WEBSOCKET_WRAPPER) && NAD_ENS(nad, 0) >= 0 && NAD_NURI_L(nad, NAD_ENS(nad, 0)) == strlen(uri_XFRAMING) && strncmp(NAD_NURI(nad, NAD_ENS(nad, 0)), uri_XFRAMING, strlen(uri_XFRAMING)) == 0 && NAD_ENAME_L(nad, 0) == 5 && strncmp(NAD_ENAME(nad, 0), "close", 5) == 0) {
143 _sx_debug(ZONE, "<close/> frame @ depth %d", s->depth);
144 s->fail = 1;
145 break;
146 }
147
148 /* run it by the plugins */
149 if(_sx_chain_nad_read(s, nad) == 0)
150 return;
151
152 /* now let the plugins process the completed nad */
153 plugin_error = 0;
154 if(s->env != NULL)
155 for(i = 0; i < s->env->nplugins; i++)
156 if(s->env->plugins[i]->process != NULL) {
157 int plugin_ret;
158 plugin_ret = (s->env->plugins[i]->process)(s, s->env->plugins[i], nad);
159 if(plugin_ret == 0) {
160 plugin_error ++;
161 break;
162 }
163 }
164
165 /* hand it to the app */
166 if ((plugin_error == 0) && (s->state < state_CLOSING))
167 _sx_event(s, event_PACKET, (void *) nad);
168 }
169
170 /* something went wrong, bail */
171 if(s->fail) {
172 _sx_close(s);
173
174 return;
175 }
176
177 /* stream was closed */
178 if(s->depth < 0 && s->state < state_CLOSING) {
179 /* close the stream if necessary */
180
181 if(s->state >= state_STREAM_SENT) {
182 if (s->flags & SX_WEBSOCKET_WRAPPER)
183 jqueue_push(s->wbufq, _sx_buffer_new("<close xmlns='" uri_XFRAMING "' />", sizeof(uri_XFRAMING) + 17, NULL, NULL), 0);
184 else
185 jqueue_push(s->wbufq, _sx_buffer_new("</stream:stream>", 16, NULL, NULL), 0);
186 s->want_write = 1;
187 }
188
189 _sx_state(s, state_CLOSING);
190
191 return;
192 }
193 }
194
195 /** we can read */
sx_can_read(sx_t s)196 int sx_can_read(sx_t s) {
197 sx_buf_t in, out;
198 int read, ret;
199
200 assert((int) (s != NULL));
201
202 /* do we care? */
203 if(!s->want_read && s->state < state_CLOSING)
204 return 0; /* no more thanks */
205
206 _sx_debug(ZONE, "%d ready for reading", s->tag);
207
208 /* new buffer */
209 in = _sx_buffer_new(NULL, 1024, NULL, NULL);
210
211 /* get them to read stuff */
212 read = _sx_event(s, event_READ, (void *) in);
213
214 /* bail if something went wrong */
215 if(read < 0) {
216 _sx_buffer_free(in);
217 s->want_read = 0;
218 s->want_write = 0;
219 return 0;
220 }
221
222 if(read == 0) {
223 /* nothing to read
224 * should never happen because we did get a read event,
225 * thus there is something to read, or error handled
226 * via (read < 0) block before (errors return -1) */
227 _sx_debug(ZONE, "decoded 0 bytes read data - this should not happen");
228 _sx_buffer_free(in);
229
230 } else {
231 _sx_debug(ZONE, "passed %d read bytes", in->len);
232
233 /* count bytes read */
234 s->rbytes += in->len;
235
236 /* make a copy for processing */
237 out = _sx_buffer_new(in->data, in->len, in->notify, in->notify_arg);
238
239 /* run it by the plugins */
240 ret = _sx_chain_io_read(s, out);
241
242 /* check if the stanza size limit is exceeded (it wasn't reset by parser) */
243 if(s->rbytesmax && s->rbytes > s->rbytesmax) {
244 _sx_debug(ZONE, "maximum stanza size (%d) exceeded by reading %d bytes", s->rbytesmax, s->pbytes);
245 /* make it fail */
246 ret = -1;
247 }
248
249 if(ret <= 0) {
250 if(ret < 0) {
251 /* permanent failure, its all over */
252 /* !!! shut down */
253 s->want_read = s->want_write = 0;
254 }
255
256 _sx_buffer_free(in);
257 _sx_buffer_free(out);
258
259 /* done */
260 if(s->want_write) _sx_event(s, event_WANT_WRITE, NULL);
261 return s->want_read;
262 }
263
264 _sx_buffer_free(in);
265
266 _sx_debug(ZONE, "decoded read data (%d bytes): %.*s", out->len, out->len, out->data);
267
268 /* into the parser with you */
269 _sx_process_read(s, out);
270 }
271
272 /* if we've written everything, and we're closed, then inform the app it can kill us */
273 if(s->want_write == 0 && s->state == state_CLOSING) {
274 _sx_state(s, state_CLOSED);
275 _sx_event(s, event_CLOSED, NULL);
276 return 0;
277 }
278
279 if(s->state == state_CLOSED)
280 return 0;
281
282 if(s->want_write) _sx_event(s, event_WANT_WRITE, NULL);
283 return s->want_read;
284 }
285
286 /** we can write */
_sx_get_pending_write(sx_t s)287 static int _sx_get_pending_write(sx_t s) {
288 sx_buf_t in, out;
289 int ret;
290
291 assert(s != NULL);
292
293 if (s->wbufpending != NULL) {
294 /* there's already a pending buffer ready to write */
295 return 0;
296 }
297
298 /* get the first buffer off the queue */
299 in = jqueue_pull(s->wbufq);
300 if(in == NULL) {
301 /* if there was a write event, and something is interested,
302 we still have to tell the plugins */
303 in = _sx_buffer_new(NULL, 0, NULL, NULL);
304 }
305
306 /* if there's more to write, we want to make sure we get it */
307 s->want_write = jqueue_size(s->wbufq);
308
309 /* make a copy for processing */
310 out = _sx_buffer_new(in->data, in->len, in->notify, in->notify_arg);
311
312 _sx_debug(ZONE, "encoding %d bytes for writing: %.*s", in->len, in->len, in->data);
313
314 /* run it by the plugins */
315 ret = _sx_chain_io_write(s, out);
316 if(ret <= 0) {
317 if(ret == -1) {
318 /* temporary failure, push it back on the queue */
319 jqueue_push(s->wbufq, in, (s->wbufq->front != NULL) ? s->wbufq->front->priority : 0);
320 s->want_write = 1;
321 } else {
322 _sx_buffer_free(in);
323 }
324
325 if(ret == -2) {
326 /* permanent failure, its all over */
327 /* !!! shut down */
328 s->want_read = s->want_write = 0;
329 return -1;
330 }
331
332 /* done */
333 return 0;
334 }
335
336 _sx_buffer_free(in);
337
338 if (out->len == 0)
339 /* if there's nothing to write, then we're done */
340 _sx_buffer_free(out);
341 else
342 s->wbufpending = out;
343
344 return 0;
345 }
346
sx_can_write(sx_t s)347 int sx_can_write(sx_t s) {
348 sx_buf_t out;
349 int ret, written;
350
351 assert((int) (s != NULL));
352
353 /* do we care? */
354 if(!s->want_write && s->state < state_CLOSING)
355 return 0; /* no more thanks */
356
357 _sx_debug(ZONE, "%d ready for writing", s->tag);
358
359 ret = _sx_get_pending_write(s);
360 if (ret < 0) {
361 /* fatal error */
362 _sx_debug(ZONE, "fatal error after attempt to write on fd %d", s->tag);
363 /* permanent error so inform the app it can kill us */
364 sx_kill(s);
365 return 0;
366 }
367
368 /* if there's nothing to write, then we're done */
369 if(s->wbufpending == NULL) {
370 if(s->want_read) _sx_event(s, event_WANT_READ, NULL);
371 return s->want_write;
372 }
373
374 out = s->wbufpending;
375 s->wbufpending = NULL;
376
377 /* get the callback to do the write */
378 _sx_debug(ZONE, "handing app %d bytes to write", out->len);
379 written = _sx_event(s, event_WRITE, (void *) out);
380
381 if(written < 0) {
382 /* bail if something went wrong */
383 _sx_buffer_free(out);
384 s->want_read = 0;
385 s->want_write = 0;
386 return 0;
387 } else if(written < out->len) {
388 /* if not fully written, this buffer is still pending */
389 out->len -= written;
390 out->data += written;
391 s->wbufpending = out;
392 s->want_write ++;
393 } else {
394 /* notify */
395 if(out->notify != NULL)
396 (out->notify)(s, out->notify_arg);
397
398 /* done with this */
399 _sx_buffer_free(out);
400 }
401
402 /* if we've written everything, and we're closed, then inform the app it can kill us */
403 if(s->want_write == 0 && s->state == state_CLOSING) {
404 _sx_state(s, state_CLOSED);
405 _sx_event(s, event_CLOSED, NULL);
406 return 0;
407 }
408
409 if(s->state == state_CLOSED)
410 return 0;
411
412 if(s->want_read) _sx_event(s, event_WANT_READ, NULL);
413 return s->want_write;
414 }
415
416 /** send a new nad out */
_sx_nad_write(sx_t s,nad_t nad,int elem)417 int _sx_nad_write(sx_t s, nad_t nad, int elem) {
418 const char *out;
419 int len;
420
421 /* silently drop it if we're closing or closed */
422 if(s->state >= state_CLOSING) {
423 log_debug(ZONE, "stream closed, dropping outgoing packet");
424 nad_free(nad);
425 return 1;
426 }
427
428 /* run it through the plugins */
429 if(_sx_chain_nad_write(s, nad, elem) == 0)
430 return 1;
431
432 /* serialise it */
433 nad_print(nad, elem, &out, &len);
434
435 _sx_debug(ZONE, "queueing for write: %.*s", len, out);
436
437 /* ready to go */
438 jqueue_push(s->wbufq, _sx_buffer_new(out, len, NULL, NULL), 0);
439
440 nad_free(nad);
441
442 /* things to write */
443 s->want_write = 1;
444
445 return 0;
446 }
447
448 /** app version */
sx_nad_write_elem(sx_t s,nad_t nad,int elem)449 void sx_nad_write_elem(sx_t s, nad_t nad, int elem) {
450 assert((int) (s != NULL));
451 assert((int) (nad != NULL));
452
453 if(_sx_nad_write(s, nad, elem) == 1)
454 return;
455
456 /* things to write */
457 s->want_write = 1;
458 _sx_event(s, event_WANT_WRITE, NULL);
459
460 if(s->want_read) _sx_event(s, event_WANT_READ, NULL);
461 }
462
463 /** send raw data out */
_sx_raw_write(sx_t s,const char * buf,int len)464 int _sx_raw_write(sx_t s, const char *buf, int len) {
465 /* siltently drop it if we're closing or closed */
466 if(s->state >= state_CLOSING) {
467 log_debug(ZONE, "stream closed, dropping outgoing raw data");
468 return 1;
469 }
470
471 _sx_debug(ZONE, "queuing for write: %.*s", len, buf);
472
473 /* ready to go */
474 jqueue_push(s->wbufq, _sx_buffer_new(buf, len, NULL, NULL), 0);
475
476 /* things to write */
477 s->want_write = 1;
478
479 return 0;
480 }
481
482 /** app version */
sx_raw_write(sx_t s,const char * buf,int len)483 void sx_raw_write(sx_t s, const char *buf, int len) {
484 assert((int) (s != NULL));
485 assert((int) (buf != NULL));
486 assert(len);
487
488 if(_sx_raw_write(s, buf, len) == 1)
489 return;
490
491 /* things to write */
492 s->want_write = 1;
493 _sx_event(s, event_WANT_WRITE, NULL);
494
495 if(s->want_read) _sx_event(s, event_WANT_READ, NULL);
496 }
497
498 /** close a stream */
_sx_close(sx_t s)499 void _sx_close(sx_t s) {
500 /* close the stream if necessary */
501 if(s->state >= state_STREAM_SENT) {
502 if (s->flags & SX_WEBSOCKET_WRAPPER)
503 jqueue_push(s->wbufq, _sx_buffer_new("<close xmlns='" uri_XFRAMING "' />", sizeof(uri_XFRAMING) + 17, NULL, NULL), 0);
504 else
505 jqueue_push(s->wbufq, _sx_buffer_new("</stream:stream>", 16, NULL, NULL), 0);
506 s->want_write = 1;
507 }
508
509 _sx_state(s, state_CLOSING);
510 }
511
sx_close(sx_t s)512 void sx_close(sx_t s) {
513 assert((int) (s != NULL));
514
515 if(s->state >= state_CLOSING)
516 return;
517
518 if(s->state >= state_STREAM_SENT && s->state < state_CLOSING) {
519 _sx_close(s);
520 _sx_event(s, event_WANT_WRITE, NULL);
521 } else {
522 _sx_state(s, state_CLOSED);
523 _sx_event(s, event_CLOSED, NULL);
524 }
525 }
526
sx_kill(sx_t s)527 void sx_kill(sx_t s) {
528 assert((int) (s != NULL));
529
530 _sx_state(s, state_CLOSED);
531 _sx_event(s, event_CLOSED, NULL);
532 }
533