1 /*
2  * jabberd - Jabber Open Source Server
3  * Copyright (c) 2002 Jeremie Miller, Thomas Muldowney,
4  *                    Ryan Eatmon, Robert Norris
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA02111-1307USA
19  */
20 
21 #include "sx.h"
22 
23 /** handler for read data */
_sx_process_read(sx_t s,sx_buf_t buf)24 void _sx_process_read(sx_t s, sx_buf_t buf) {
25     sx_error_t sxe;
26     nad_t nad;
27     char *errstring;
28     int i;
29     int ns, elem;
30 
31     /* Note that buf->len can validly be 0 here, if we got data from
32        the socket but the plugin didn't return anything to us (e.g. a
33        SSL packet was split across a tcp segment boundary) */
34 
35     /* count bytes parsed */
36     s->pbytes += buf->len;
37 
38     /* parse it */
39     if(XML_Parse(s->expat, buf->data, buf->len, 0) == 0) {
40         /* only report error we haven't already */
41         if(!s->fail) {
42             /* parse error */
43             errstring = (char *) XML_ErrorString(XML_GetErrorCode(s->expat));
44 
45             _sx_debug(ZONE, "XML parse error: %s, character %d: %.*s",
46                       errstring, XML_GetCurrentByteIndex(s->expat) - s->tbytes, buf->len, buf->data);
47             _sx_gen_error(sxe, SX_ERR_XML_PARSE, "XML parse error", errstring);
48             _sx_event(s, event_ERROR, (void *) &sxe);
49 
50             _sx_error(s, stream_err_XML_NOT_WELL_FORMED, errstring);
51             _sx_close(s);
52 
53             _sx_buffer_free(buf);
54 
55             return;
56         }
57 
58         /* !!! is this the right thing to do? we should probably set
59          *     s->fail and let the code further down handle it. */
60         _sx_buffer_free(buf);
61 
62         return;
63     }
64 
65     /* check if the stanza size limit is exceeded (it wasn't reset by parser) */
66     if(s->rbytesmax && s->pbytes > s->rbytesmax) {
67         /* parse error */
68         _sx_debug(ZONE, "maximum stanza size (%d) exceeded by reading %d bytes", s->rbytesmax, s->pbytes);
69 
70         errstring = (char *) XML_ErrorString(XML_GetErrorCode(s->expat));
71 
72         _sx_gen_error(sxe, SX_ERR_XML_PARSE, "stream read error", "Maximum stanza size exceeded");
73         _sx_event(s, event_ERROR, (void *) &sxe);
74 
75         _sx_error(s, stream_err_POLICY_VIOLATION, errstring);
76         _sx_close(s);
77 
78         _sx_buffer_free(buf);
79 
80         return;
81     }
82 
83     /* count bytes processed */
84     s->tbytes += buf->len;
85 
86     /* done with the buffer */
87     _sx_buffer_free(buf);
88 
89     /* process completed nads */
90     if(s->state >= state_STREAM)
91         while((nad = jqueue_pull(s->rnadq)) != NULL) {
92             int plugin_error;
93 #ifdef SX_DEBUG
94             const char *out; int len;
95             nad_print(nad, 0, &out, &len);
96             _sx_debug(ZONE, "completed nad: %.*s", len, out);
97 #endif
98 
99             /* check for errors */
100             if(NAD_ENS(nad, 0) >= 0 && NAD_NURI_L(nad, NAD_ENS(nad, 0)) == strlen(uri_STREAMS) && strncmp(NAD_NURI(nad, NAD_ENS(nad, 0)), uri_STREAMS, strlen(uri_STREAMS)) == 0 && NAD_ENAME_L(nad, 0) == 5 && strncmp(NAD_ENAME(nad, 0), "error", 5) == 0) {
101 
102                 errstring = NULL;
103 
104                 /* get text error description if available - XMPP 4.7.2 */
105                 if((ns = nad_find_scoped_namespace(nad, uri_STREAM_ERR, NULL)) >= 0)
106                     if((elem = nad_find_elem(nad, 0, ns, "text", 1)) >= 0)
107                         if(NAD_CDATA_L(nad, elem) > 0) {
108                             errstring = (char *) malloc(sizeof(char) * (NAD_CDATA_L(nad, elem) + 1));
109                             sprintf(errstring, "%.*s", NAD_CDATA_L(nad, elem), NAD_CDATA(nad, elem));
110                         }
111 
112                 /* if not available, look for legacy error text as in <stream:error>description</stream:error> */
113                 if (errstring == NULL && NAD_CDATA_L(nad, 0) > 0) {
114                     errstring = (char *) malloc(sizeof(char) * (NAD_CDATA_L(nad, 0) + 1));
115                     sprintf(errstring, "%.*s", NAD_CDATA_L(nad, 0), NAD_CDATA(nad, 0));
116                 }
117 
118                 /* if not available, log the whole packet for debugging */
119                 if (errstring == NULL) {
120                     const char *xml;
121                     int xlen;
122 
123                     nad_print(nad, 0, &xml, &xlen);
124                     errstring = (char *) malloc(sizeof(char) * (xlen + 1));
125                     sprintf(errstring, "%.*s", xlen, xml);
126                 }
127 
128                 if(s->state < state_CLOSING) {
129                     _sx_gen_error(sxe, SX_ERR_STREAM, "Stream error", errstring);
130                     _sx_event(s, event_ERROR, (void *) &sxe);
131                     _sx_state(s, state_CLOSING);
132                 }
133 
134                 free(errstring);
135 
136                 nad_free(nad);
137 
138                 break;
139             }
140 
141             /* check for close */
142             if ((s->flags & SX_WEBSOCKET_WRAPPER) && NAD_ENS(nad, 0) >= 0 && NAD_NURI_L(nad, NAD_ENS(nad, 0)) == strlen(uri_XFRAMING) && strncmp(NAD_NURI(nad, NAD_ENS(nad, 0)), uri_XFRAMING, strlen(uri_XFRAMING)) == 0 && NAD_ENAME_L(nad, 0) == 5 && strncmp(NAD_ENAME(nad, 0), "close", 5) == 0) {
143                 _sx_debug(ZONE, "<close/> frame @ depth %d", s->depth);
144                 s->fail = 1;
145                 break;
146             }
147 
148             /* run it by the plugins */
149             if(_sx_chain_nad_read(s, nad) == 0)
150                 return;
151 
152             /* now let the plugins process the completed nad */
153             plugin_error = 0;
154             if(s->env != NULL)
155                 for(i = 0; i < s->env->nplugins; i++)
156                     if(s->env->plugins[i]->process != NULL) {
157                         int plugin_ret;
158                         plugin_ret = (s->env->plugins[i]->process)(s, s->env->plugins[i], nad);
159                         if(plugin_ret == 0) {
160                             plugin_error ++;
161                             break;
162                         }
163                     }
164 
165             /* hand it to the app */
166             if ((plugin_error == 0) && (s->state < state_CLOSING))
167                 _sx_event(s, event_PACKET, (void *) nad);
168         }
169 
170     /* something went wrong, bail */
171     if(s->fail) {
172         _sx_close(s);
173 
174         return;
175     }
176 
177     /* stream was closed */
178     if(s->depth < 0 && s->state < state_CLOSING) {
179         /* close the stream if necessary */
180 
181         if(s->state >= state_STREAM_SENT) {
182             if (s->flags & SX_WEBSOCKET_WRAPPER)
183                 jqueue_push(s->wbufq, _sx_buffer_new("<close xmlns='" uri_XFRAMING "' />", sizeof(uri_XFRAMING) + 17, NULL, NULL), 0);
184             else
185                 jqueue_push(s->wbufq, _sx_buffer_new("</stream:stream>", 16, NULL, NULL), 0);
186             s->want_write = 1;
187         }
188 
189         _sx_state(s, state_CLOSING);
190 
191         return;
192     }
193 }
194 
195 /** we can read */
sx_can_read(sx_t s)196 int sx_can_read(sx_t s) {
197     sx_buf_t in, out;
198     int read, ret;
199 
200     assert((int) (s != NULL));
201 
202     /* do we care? */
203     if(!s->want_read && s->state < state_CLOSING)
204         return 0;           /* no more thanks */
205 
206     _sx_debug(ZONE, "%d ready for reading", s->tag);
207 
208     /* new buffer */
209     in = _sx_buffer_new(NULL, 1024, NULL, NULL);
210 
211     /* get them to read stuff */
212     read = _sx_event(s, event_READ, (void *) in);
213 
214     /* bail if something went wrong */
215     if(read < 0) {
216         _sx_buffer_free(in);
217         s->want_read = 0;
218         s->want_write = 0;
219         return 0;
220     }
221 
222     if(read == 0) {
223         /* nothing to read
224          * should never happen because we did get a read event,
225          * thus there is something to read, or error handled
226          * via (read < 0) block before (errors return -1) */
227         _sx_debug(ZONE, "decoded 0 bytes read data - this should not happen");
228         _sx_buffer_free(in);
229 
230     } else {
231         _sx_debug(ZONE, "passed %d read bytes", in->len);
232 
233         /* count bytes read */
234         s->rbytes += in->len;
235 
236         /* make a copy for processing */
237         out = _sx_buffer_new(in->data, in->len, in->notify, in->notify_arg);
238 
239         /* run it by the plugins */
240         ret = _sx_chain_io_read(s, out);
241 
242         /* check if the stanza size limit is exceeded (it wasn't reset by parser) */
243         if(s->rbytesmax && s->rbytes > s->rbytesmax) {
244             _sx_debug(ZONE, "maximum stanza size (%d) exceeded by reading %d bytes", s->rbytesmax, s->pbytes);
245             /* make it fail */
246             ret = -1;
247         }
248 
249         if(ret <= 0) {
250             if(ret < 0) {
251                 /* permanent failure, its all over */
252                 /* !!! shut down */
253                 s->want_read = s->want_write = 0;
254             }
255 
256             _sx_buffer_free(in);
257             _sx_buffer_free(out);
258 
259             /* done */
260             if(s->want_write) _sx_event(s, event_WANT_WRITE, NULL);
261             return s->want_read;
262         }
263 
264         _sx_buffer_free(in);
265 
266         _sx_debug(ZONE, "decoded read data (%d bytes): %.*s", out->len, out->len, out->data);
267 
268         /* into the parser with you */
269         _sx_process_read(s, out);
270     }
271 
272     /* if we've written everything, and we're closed, then inform the app it can kill us */
273     if(s->want_write == 0 && s->state == state_CLOSING) {
274         _sx_state(s, state_CLOSED);
275         _sx_event(s, event_CLOSED, NULL);
276         return 0;
277     }
278 
279     if(s->state == state_CLOSED)
280         return 0;
281 
282     if(s->want_write) _sx_event(s, event_WANT_WRITE, NULL);
283     return s->want_read;
284 }
285 
286 /** we can write */
_sx_get_pending_write(sx_t s)287 static int _sx_get_pending_write(sx_t s) {
288     sx_buf_t in, out;
289     int ret;
290 
291     assert(s != NULL);
292 
293     if (s->wbufpending != NULL) {
294     /* there's already a pending buffer ready to write */
295     return 0;
296     }
297 
298     /* get the first buffer off the queue */
299     in = jqueue_pull(s->wbufq);
300     if(in == NULL) {
301         /* if there was a write event, and something is interested,
302        we still have to tell the plugins */
303         in = _sx_buffer_new(NULL, 0, NULL, NULL);
304     }
305 
306     /* if there's more to write, we want to make sure we get it */
307     s->want_write = jqueue_size(s->wbufq);
308 
309     /* make a copy for processing */
310     out = _sx_buffer_new(in->data, in->len, in->notify, in->notify_arg);
311 
312     _sx_debug(ZONE, "encoding %d bytes for writing: %.*s", in->len, in->len, in->data);
313 
314     /* run it by the plugins */
315     ret = _sx_chain_io_write(s, out);
316     if(ret <= 0) {
317         if(ret == -1) {
318             /* temporary failure, push it back on the queue */
319             jqueue_push(s->wbufq, in, (s->wbufq->front != NULL) ? s->wbufq->front->priority : 0);
320             s->want_write = 1;
321         } else {
322             _sx_buffer_free(in);
323         }
324 
325         if(ret == -2) {
326             /* permanent failure, its all over */
327             /* !!! shut down */
328             s->want_read = s->want_write = 0;
329             return -1;
330         }
331 
332         /* done */
333         return 0;
334     }
335 
336     _sx_buffer_free(in);
337 
338     if (out->len == 0)
339     /* if there's nothing to write, then we're done */
340         _sx_buffer_free(out);
341     else
342         s->wbufpending = out;
343 
344     return 0;
345 }
346 
sx_can_write(sx_t s)347 int sx_can_write(sx_t s) {
348     sx_buf_t out;
349     int ret, written;
350 
351     assert((int) (s != NULL));
352 
353     /* do we care? */
354     if(!s->want_write && s->state < state_CLOSING)
355         return 0;           /* no more thanks */
356 
357     _sx_debug(ZONE, "%d ready for writing", s->tag);
358 
359     ret = _sx_get_pending_write(s);
360     if (ret < 0) {
361         /* fatal error */
362         _sx_debug(ZONE, "fatal error after attempt to write on fd %d", s->tag);
363         /* permanent error so inform the app it can kill us */
364         sx_kill(s);
365         return 0;
366     }
367 
368     /* if there's nothing to write, then we're done */
369     if(s->wbufpending == NULL) {
370         if(s->want_read) _sx_event(s, event_WANT_READ, NULL);
371         return s->want_write;
372     }
373 
374     out = s->wbufpending;
375     s->wbufpending = NULL;
376 
377     /* get the callback to do the write */
378     _sx_debug(ZONE, "handing app %d bytes to write", out->len);
379     written = _sx_event(s, event_WRITE, (void *) out);
380 
381     if(written < 0) {
382         /* bail if something went wrong */
383         _sx_buffer_free(out);
384         s->want_read = 0;
385         s->want_write = 0;
386         return 0;
387     } else if(written < out->len) {
388         /* if not fully written, this buffer is still pending */
389         out->len -= written;
390         out->data += written;
391         s->wbufpending = out;
392         s->want_write ++;
393     } else {
394         /* notify */
395         if(out->notify != NULL)
396             (out->notify)(s, out->notify_arg);
397 
398         /* done with this */
399         _sx_buffer_free(out);
400     }
401 
402     /* if we've written everything, and we're closed, then inform the app it can kill us */
403     if(s->want_write == 0 && s->state == state_CLOSING) {
404         _sx_state(s, state_CLOSED);
405         _sx_event(s, event_CLOSED, NULL);
406         return 0;
407     }
408 
409     if(s->state == state_CLOSED)
410         return 0;
411 
412     if(s->want_read) _sx_event(s, event_WANT_READ, NULL);
413     return s->want_write;
414 }
415 
416 /** send a new nad out */
_sx_nad_write(sx_t s,nad_t nad,int elem)417 int _sx_nad_write(sx_t s, nad_t nad, int elem) {
418     const char *out;
419     int len;
420 
421     /* silently drop it if we're closing or closed */
422     if(s->state >= state_CLOSING) {
423         log_debug(ZONE, "stream closed, dropping outgoing packet");
424         nad_free(nad);
425         return 1;
426     }
427 
428     /* run it through the plugins */
429     if(_sx_chain_nad_write(s, nad, elem) == 0)
430         return 1;
431 
432     /* serialise it */
433     nad_print(nad, elem, &out, &len);
434 
435     _sx_debug(ZONE, "queueing for write: %.*s", len, out);
436 
437     /* ready to go */
438     jqueue_push(s->wbufq, _sx_buffer_new(out, len, NULL, NULL), 0);
439 
440     nad_free(nad);
441 
442     /* things to write */
443     s->want_write = 1;
444 
445     return 0;
446 }
447 
448 /** app version */
sx_nad_write_elem(sx_t s,nad_t nad,int elem)449 void sx_nad_write_elem(sx_t s, nad_t nad, int elem) {
450     assert((int) (s != NULL));
451     assert((int) (nad != NULL));
452 
453     if(_sx_nad_write(s, nad, elem) == 1)
454         return;
455 
456     /* things to write */
457     s->want_write = 1;
458     _sx_event(s, event_WANT_WRITE, NULL);
459 
460     if(s->want_read) _sx_event(s, event_WANT_READ, NULL);
461 }
462 
463 /** send raw data out */
_sx_raw_write(sx_t s,const char * buf,int len)464 int _sx_raw_write(sx_t s, const char *buf, int len) {
465     /* siltently drop it if we're closing or closed */
466     if(s->state >= state_CLOSING) {
467         log_debug(ZONE, "stream closed, dropping outgoing raw data");
468         return 1;
469     }
470 
471     _sx_debug(ZONE, "queuing for write: %.*s", len, buf);
472 
473     /* ready to go */
474     jqueue_push(s->wbufq, _sx_buffer_new(buf, len, NULL, NULL), 0);
475 
476     /* things to write */
477     s->want_write = 1;
478 
479     return 0;
480 }
481 
482 /** app version */
sx_raw_write(sx_t s,const char * buf,int len)483 void sx_raw_write(sx_t s, const char *buf, int len) {
484     assert((int) (s != NULL));
485     assert((int) (buf != NULL));
486     assert(len);
487 
488     if(_sx_raw_write(s, buf, len) == 1)
489         return;
490 
491     /* things to write */
492     s->want_write = 1;
493     _sx_event(s, event_WANT_WRITE, NULL);
494 
495     if(s->want_read) _sx_event(s, event_WANT_READ, NULL);
496 }
497 
498 /** close a stream */
_sx_close(sx_t s)499 void _sx_close(sx_t s) {
500     /* close the stream if necessary */
501     if(s->state >= state_STREAM_SENT) {
502         if (s->flags & SX_WEBSOCKET_WRAPPER)
503             jqueue_push(s->wbufq, _sx_buffer_new("<close xmlns='" uri_XFRAMING "' />", sizeof(uri_XFRAMING) + 17, NULL, NULL), 0);
504         else
505             jqueue_push(s->wbufq, _sx_buffer_new("</stream:stream>", 16, NULL, NULL), 0);
506         s->want_write = 1;
507     }
508 
509     _sx_state(s, state_CLOSING);
510 }
511 
sx_close(sx_t s)512 void sx_close(sx_t s) {
513     assert((int) (s != NULL));
514 
515     if(s->state >= state_CLOSING)
516         return;
517 
518     if(s->state >= state_STREAM_SENT && s->state < state_CLOSING) {
519         _sx_close(s);
520         _sx_event(s, event_WANT_WRITE, NULL);
521     } else {
522         _sx_state(s, state_CLOSED);
523         _sx_event(s, event_CLOSED, NULL);
524     }
525 }
526 
sx_kill(sx_t s)527 void sx_kill(sx_t s) {
528     assert((int) (s != NULL));
529 
530     _sx_state(s, state_CLOSED);
531     _sx_event(s, event_CLOSED, NULL);
532 }
533