/* * This file is part of the Nice GLib ICE library. * * (C) 2010, 2014 Collabora Ltd. * Contact: Philip Withnall * * The contents of this file are subject to the Mozilla Public License Version * 1.1 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * * The Original Code is the Nice GLib ICE library. * * The Initial Developers of the Original Code are Collabora Ltd and Nokia * Corporation. All Rights Reserved. * * Contributors: * Youness Alaoui, Collabora Ltd. * Philip Withnall, Collabora Ltd. * * Alternatively, the contents of this file may be used under the terms of the * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which * case the provisions of LGPL are applicable instead of those above. If you * wish to allow use of your version of this file only under the terms of the * LGPL and not to allow others to use your version of this file under the * MPL, indicate your decision by deleting the provisions above and replace * them with the notice and other provisions required by the LGPL. If you do * not delete the provisions above, a recipient may use your version of this * file under either the MPL or the LGPL. */ /* Reproducing license from libjingle for copied code */ /* * libjingle * Copyright 2004--2005, Google Inc. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #ifndef G_OS_WIN32 # include #endif #include "pseudotcp.h" #include "agent-priv.h" struct _PseudoTcpSocketClass { GObjectClass parent_class; }; typedef struct _PseudoTcpSocketPrivate PseudoTcpSocketPrivate; struct _PseudoTcpSocket { GObject parent; PseudoTcpSocketPrivate *priv; }; G_DEFINE_TYPE (PseudoTcpSocket, pseudo_tcp_socket, G_TYPE_OBJECT); ////////////////////////////////////////////////////////////////////// // Network Constants ////////////////////////////////////////////////////////////////////// // Standard MTUs const guint16 PACKET_MAXIMUMS[] = { 65535, // Theoretical maximum, Hyperchannel 32000, // Nothing 17914, // 16Mb IBM Token Ring 8166, // IEEE 802.4 //4464, // IEEE 802.5 (4Mb max) 4352, // FDDI //2048, // Wideband Network 2002, // IEEE 802.5 (4Mb recommended) //1536, // Expermental Ethernet Networks //1500, // Ethernet, Point-to-Point (default) 1492, // IEEE 802.3 1006, // SLIP, ARPANET //576, // X.25 Networks //544, // DEC IP Portal //512, // NETBIOS 508, // IEEE 802/Source-Rt Bridge, ARCNET 296, // Point-to-Point (low delay) //68, // Official minimum 0, // End of list marker }; // FIXME: This is a reasonable MTU, but we should get it from the lower layer #define DEF_MTU 1400 #define MAX_PACKET 65532 // Note: we removed lowest level because packet overhead was larger! #define MIN_PACKET 296 // (+ up to 40 bytes of options?) #define IP_HEADER_SIZE 20 #define ICMP_HEADER_SIZE 8 #define UDP_HEADER_SIZE 8 // TODO: Make JINGLE_HEADER_SIZE transparent to this code? // when relay framing is in use #define JINGLE_HEADER_SIZE 64 ////////////////////////////////////////////////////////////////////// // Global Constants and Functions ////////////////////////////////////////////////////////////////////// // // 0 1 2 3 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // 0 | Conversation Number | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // 4 | Sequence Number | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // 8 | Acknowledgment Number | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // | | |U|A|P|R|S|F| | // 12 | Control | |R|C|S|S|Y|I| Window | // | | |G|K|H|T|N|N| | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // 16 | Timestamp sending | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // 20 | Timestamp receiving | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // 24 | data | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // ////////////////////////////////////////////////////////////////////// #define MAX_SEQ 0xFFFFFFFF #define HEADER_SIZE 24 #define PACKET_OVERHEAD (HEADER_SIZE + UDP_HEADER_SIZE + \ IP_HEADER_SIZE + JINGLE_HEADER_SIZE) // MIN_RTO = 1 second (RFC6298, Sec 2.4) #define MIN_RTO 1000 #define DEF_RTO 1000 /* 1 seconds (RFC 6298 sect 2.1) */ #define MAX_RTO 60000 /* 60 seconds */ #define DEFAULT_ACK_DELAY 100 /* 100 milliseconds */ #define DEFAULT_NO_DELAY FALSE #define DEFAULT_RCV_BUF_SIZE (60 * 1024) #define DEFAULT_SND_BUF_SIZE (90 * 1024) /* NOTE: This must fit in 8 bits. This is used on the wire. */ typedef enum { /* Google-provided options: */ TCP_OPT_EOL = 0, /* end of list */ TCP_OPT_NOOP = 1, /* no-op */ TCP_OPT_MSS = 2, /* maximum segment size */ TCP_OPT_WND_SCALE = 3, /* window scale factor */ /* libnice extensions: */ TCP_OPT_FIN_ACK = 254, /* FIN-ACK support */ } TcpOption; /* #define FLAG_SYN 0x02 #define FLAG_ACK 0x10 */ /* NOTE: This must fit in 5 bits. This is used on the wire. */ typedef enum { FLAG_NONE = 0, FLAG_FIN = 1 << 0, FLAG_CTL = 1 << 1, FLAG_RST = 1 << 2, } TcpFlags; #define CTL_CONNECT 0 //#define CTL_REDIRECT 1 #define CTL_EXTRA 255 #define CTRL_BOUND 0x80000000 /* Maximum segment lifetime (1 minute). * RFC 793, §3.3 specifies 2 minutes; but Linux uses 1 minute, so let’s go with * that. */ #define TCP_MSL (60 * 1000) // If there are no pending clocks, wake up every 4 seconds #define DEFAULT_TIMEOUT 4000 // If the connection is closed, once per minute #define CLOSED_TIMEOUT (60 * 1000) /* Timeout after reaching the TIME_WAIT state, in milliseconds. * See: RFC 1122, §4.2.2.13. * * XXX: Since we can control the underlying layer’s channel ID, we can guarantee * delayed segments won’t affect subsequent connections, so can radically * shorten the TIME-WAIT timeout (to the extent that it basically doesn’t * exist). It would normally be (2 * TCP_MSL). */ #define TIME_WAIT_TIMEOUT 1 ////////////////////////////////////////////////////////////////////// // Helper Functions ////////////////////////////////////////////////////////////////////// #ifndef G_OS_WIN32 # define min(first, second) ((first) < (second) ? (first) : (second)) # define max(first, second) ((first) > (second) ? (first) : (second)) #endif static guint32 bound(guint32 lower, guint32 middle, guint32 upper) { return min (max (lower, middle), upper); } static gboolean time_is_between(guint32 later, guint32 middle, guint32 earlier) { if (earlier <= later) { return ((earlier <= middle) && (middle <= later)); } else { return !((later < middle) && (middle < earlier)); } } static gint32 time_diff(guint32 later, guint32 earlier) { guint32 LAST = 0xFFFFFFFF; guint32 HALF = 0x80000000; if (time_is_between(earlier + HALF, later, earlier)) { if (earlier <= later) { return (long)(later - earlier); } else { return (long)(later + (LAST - earlier) + 1); } } else { if (later <= earlier) { return -(long) (earlier - later); } else { return -(long)(earlier + (LAST - later) + 1); } } } //////////////////////////////////////////////////////// // PseudoTcpFifo works exactly like FifoBuffer in libjingle //////////////////////////////////////////////////////// typedef struct { guint8 *buffer; gsize buffer_length; gsize data_length; gsize read_position; } PseudoTcpFifo; static void pseudo_tcp_fifo_init (PseudoTcpFifo *b, gsize size) { b->buffer = g_slice_alloc (size); b->buffer_length = size; } static void pseudo_tcp_fifo_clear (PseudoTcpFifo *b) { if (b->buffer) g_slice_free1 (b->buffer_length, b->buffer); b->buffer = NULL; b->buffer_length = 0; } static gsize pseudo_tcp_fifo_get_buffered (PseudoTcpFifo *b) { return b->data_length; } static gboolean pseudo_tcp_fifo_set_capacity (PseudoTcpFifo *b, gsize size) { if (b->data_length > size) return FALSE; if (size != b->data_length) { guint8 *buffer = g_slice_alloc (size); gsize copy = b->data_length; gsize tail_copy = min (copy, b->buffer_length - b->read_position); memcpy (buffer, &b->buffer[b->read_position], tail_copy); memcpy (buffer + tail_copy, &b->buffer[0], copy - tail_copy); g_slice_free1 (b->buffer_length, b->buffer); b->buffer = buffer; b->buffer_length = size; b->read_position = 0; } return TRUE; } static void pseudo_tcp_fifo_consume_read_data (PseudoTcpFifo *b, gsize size) { g_assert_cmpint (size, <=, b->data_length); b->read_position = (b->read_position + size) % b->buffer_length; b->data_length -= size; } static void pseudo_tcp_fifo_consume_write_buffer (PseudoTcpFifo *b, gsize size) { g_assert_cmpint (size, <=, b->buffer_length - b->data_length); b->data_length += size; } static gsize pseudo_tcp_fifo_get_write_remaining (PseudoTcpFifo *b) { return b->buffer_length - b->data_length; } static gsize pseudo_tcp_fifo_read_offset (PseudoTcpFifo *b, guint8 *buffer, gsize bytes, gsize offset) { gsize available = b->data_length - offset; gsize read_position = (b->read_position + offset) % b->buffer_length; gsize copy = min (bytes, available); gsize tail_copy = min(copy, b->buffer_length - read_position); /* EOS */ if (offset >= b->data_length) return 0; memcpy(buffer, &b->buffer[read_position], tail_copy); memcpy(buffer + tail_copy, &b->buffer[0], copy - tail_copy); return copy; } static gsize pseudo_tcp_fifo_write_offset (PseudoTcpFifo *b, const guint8 *buffer, gsize bytes, gsize offset) { gsize available = b->buffer_length - b->data_length - offset; gsize write_position = (b->read_position + b->data_length + offset) % b->buffer_length; gsize copy = min (bytes, available); gsize tail_copy = min(copy, b->buffer_length - write_position); if (b->data_length + offset >= b->buffer_length) { return 0; } memcpy(&b->buffer[write_position], buffer, tail_copy); memcpy(&b->buffer[0], buffer + tail_copy, copy - tail_copy); return copy; } static gsize pseudo_tcp_fifo_read (PseudoTcpFifo *b, guint8 *buffer, gsize bytes) { gsize copy; copy = pseudo_tcp_fifo_read_offset (b, buffer, bytes, 0); b->read_position = (b->read_position + copy) % b->buffer_length; b->data_length -= copy; return copy; } static gsize pseudo_tcp_fifo_write (PseudoTcpFifo *b, const guint8 *buffer, gsize bytes) { gsize copy; copy = pseudo_tcp_fifo_write_offset (b, buffer, bytes, 0); b->data_length += copy; return copy; } ////////////////////////////////////////////////////////////////////// // PseudoTcp ////////////////////////////////////////////////////////////////////// /* Only used if FIN-ACK support is disabled. */ typedef enum { SD_NONE, SD_GRACEFUL, SD_FORCEFUL } Shutdown; typedef enum { sfNone, sfDelayedAck, sfImmediateAck, sfFin, sfRst, sfDuplicateAck, } SendFlags; typedef struct { guint32 conv, seq, ack; TcpFlags flags; guint16 wnd; const gchar * data; guint32 len; guint32 tsval, tsecr; } Segment; typedef struct { guint32 seq, len; guint8 xmit; TcpFlags flags; } SSegment; typedef struct { guint32 seq, len; } RSegment; /** * ClosedownSource: * @CLOSEDOWN_LOCAL: Error detected locally, or connection forcefully closed * locally. * @CLOSEDOWN_REMOTE: RST segment received from the peer. * * Reasons for calling closedown(). * * Since: 0.1.8 */ typedef enum { CLOSEDOWN_LOCAL, CLOSEDOWN_REMOTE, } ClosedownSource; struct _PseudoTcpSocketPrivate { PseudoTcpCallbacks callbacks; Shutdown shutdown; /* only used if !support_fin_ack */ gboolean shutdown_reads; gint error; // TCB data PseudoTcpState state; guint32 conv; gboolean bReadEnable, bWriteEnable, bOutgoing; guint32 last_traffic; // Incoming data GList *rlist; guint32 rbuf_len, rcv_nxt, rcv_wnd, lastrecv; guint8 rwnd_scale; // Window scale factor PseudoTcpFifo rbuf; guint32 rcv_fin; /* sequence number of the received FIN octet, or 0 */ // Outgoing data GQueue slist; GQueue unsent_slist; guint32 sbuf_len, snd_nxt, snd_wnd, lastsend; guint32 snd_una; /* oldest unacknowledged sequence number */ guint8 swnd_scale; // Window scale factor PseudoTcpFifo sbuf; // Maximum segment size, estimated protocol level, largest segment sent guint32 mss, msslevel, largest, mtu_advise; // Retransmit timer guint32 rto_base; // Timestamp tracking guint32 ts_recent, ts_lastack; // Round-trip calculation guint32 rx_rttvar, rx_srtt, rx_rto; // Congestion avoidance, Fast retransmit/recovery, Delayed ACKs guint32 ssthresh, cwnd; guint8 dup_acks; guint32 recover; gboolean fast_recovery; guint32 t_ack; /* time a delayed ack was scheduled; 0 if no acks scheduled */ guint32 last_acked_ts; gboolean use_nagling; guint32 ack_delay; // This is used by unit tests to test backward compatibility of // PseudoTcp implementations that don't support window scaling. gboolean support_wnd_scale; /* Current time. Typically only used for testing, when non-zero. When zero, * the system monotonic clock is used. Units: monotonic milliseconds. */ guint32 current_time; /* This is used by compatible implementations (with the TCP_OPT_FIN_ACK * option) to enable correct FIN-ACK connection termination. Defaults to * TRUE unless no compatible option is received. */ gboolean support_fin_ack; }; #define LARGER(a,b) (((a) - (b) - 1) < (G_MAXUINT32 >> 1)) #define LARGER_OR_EQUAL(a,b) (((a) - (b)) < (G_MAXUINT32 >> 1)) #define SMALLER(a,b) LARGER ((b),(a)) #define SMALLER_OR_EQUAL(a,b) LARGER_OR_EQUAL ((b),(a)) /* properties */ enum { PROP_CONVERSATION = 1, PROP_CALLBACKS, PROP_STATE, PROP_ACK_DELAY, PROP_NO_DELAY, PROP_RCV_BUF, PROP_SND_BUF, PROP_SUPPORT_FIN_ACK, LAST_PROPERTY }; static void pseudo_tcp_socket_get_property (GObject *object, guint property_id, GValue *value, GParamSpec *pspec); static void pseudo_tcp_socket_set_property (GObject *object, guint property_id, const GValue *value, GParamSpec *pspec); static void pseudo_tcp_socket_finalize (GObject *object); static void queue_connect_message (PseudoTcpSocket *self); static guint32 queue (PseudoTcpSocket *self, const gchar *data, guint32 len, TcpFlags flags); static PseudoTcpWriteResult packet(PseudoTcpSocket *self, guint32 seq, TcpFlags flags, guint32 offset, guint32 len, guint32 now); static gboolean parse (PseudoTcpSocket *self, const guint8 *_header_buf, gsize header_buf_len, const guint8 *data_buf, gsize data_buf_len); static gboolean process(PseudoTcpSocket *self, Segment *seg); static int transmit(PseudoTcpSocket *self, SSegment *sseg, guint32 now); static void attempt_send(PseudoTcpSocket *self, SendFlags sflags); static void closedown (PseudoTcpSocket *self, guint32 err, ClosedownSource source); static void adjustMTU(PseudoTcpSocket *self); static void parse_options (PseudoTcpSocket *self, const guint8 *data, guint32 len); static void resize_send_buffer (PseudoTcpSocket *self, guint32 new_size); static void resize_receive_buffer (PseudoTcpSocket *self, guint32 new_size); static void set_state (PseudoTcpSocket *self, PseudoTcpState new_state); static void set_state_established (PseudoTcpSocket *self); static void set_state_closed (PseudoTcpSocket *self, guint32 err); static const gchar *pseudo_tcp_state_get_name (PseudoTcpState state); static gboolean pseudo_tcp_state_has_sent_fin (PseudoTcpState state); static gboolean pseudo_tcp_state_has_received_fin (PseudoTcpState state); static gboolean pseudo_tcp_state_has_received_fin_ack (PseudoTcpState state); // The following logging is for detailed (packet-level) pseudotcp analysis only. static PseudoTcpDebugLevel debug_level = PSEUDO_TCP_DEBUG_NONE; #define DEBUG(level, fmt, ...) \ if (debug_level >= level) \ g_log (level == PSEUDO_TCP_DEBUG_NORMAL ? "libnice-pseudotcp" : "libnice-pseudotcp-verbose", G_LOG_LEVEL_DEBUG, "PseudoTcpSocket %p %s: " fmt, \ self, pseudo_tcp_state_get_name (self->priv->state), ## __VA_ARGS__) void pseudo_tcp_set_debug_level (PseudoTcpDebugLevel level) { debug_level = level; } static guint32 get_current_time (PseudoTcpSocket *socket) { if (G_UNLIKELY (socket->priv->current_time != 0)) return socket->priv->current_time; return g_get_monotonic_time () / 1000; } void pseudo_tcp_socket_set_time (PseudoTcpSocket *self, guint32 current_time) { self->priv->current_time = current_time; } static void pseudo_tcp_socket_class_init (PseudoTcpSocketClass *cls) { GObjectClass *object_class = G_OBJECT_CLASS (cls); object_class->get_property = pseudo_tcp_socket_get_property; object_class->set_property = pseudo_tcp_socket_set_property; object_class->finalize = pseudo_tcp_socket_finalize; g_object_class_install_property (object_class, PROP_CONVERSATION, g_param_spec_uint ("conversation", "TCP Conversation ID", "The TCP Conversation ID", 0, G_MAXUINT32, 0, G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (object_class, PROP_CALLBACKS, g_param_spec_pointer ("callbacks", "PseudoTcp socket callbacks", "Structure with the callbacks to call when PseudoTcp events happen", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (object_class, PROP_STATE, g_param_spec_uint ("state", "PseudoTcp State", "The current state (enum PseudoTcpState) of the PseudoTcp socket", PSEUDO_TCP_LISTEN, PSEUDO_TCP_CLOSED, PSEUDO_TCP_LISTEN, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (object_class, PROP_ACK_DELAY, g_param_spec_uint ("ack-delay", "ACK Delay", "Delayed ACK timeout (in milliseconds)", 0, G_MAXUINT, DEFAULT_ACK_DELAY, G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS)); g_object_class_install_property (object_class, PROP_NO_DELAY, g_param_spec_boolean ("no-delay", "No Delay", "Disable the Nagle algorithm (like the TCP_NODELAY option)", DEFAULT_NO_DELAY, G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS)); g_object_class_install_property (object_class, PROP_RCV_BUF, g_param_spec_uint ("rcv-buf", "Receive Buffer", "Receive Buffer size", 1, G_MAXUINT, DEFAULT_RCV_BUF_SIZE, G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS)); g_object_class_install_property (object_class, PROP_SND_BUF, g_param_spec_uint ("snd-buf", "Send Buffer", "Send Buffer size", 1, G_MAXUINT, DEFAULT_SND_BUF_SIZE, G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS)); /** * PseudoTcpSocket:support-fin-ack: * * Whether to support the FIN–ACK extension to the pseudo-TCP protocol for * this socket. The extension is only compatible with other libnice pseudo-TCP * stacks, and not with Jingle pseudo-TCP stacks. If enabled, support is * negotiatied on connection setup, so it is safe for a #PseudoTcpSocket with * support enabled to be used with one with it disabled, or with a Jingle * pseudo-TCP socket which doesn’t support it at all. * * Support is enabled by default. * * Since: 0.1.8 */ g_object_class_install_property (object_class, PROP_SUPPORT_FIN_ACK, g_param_spec_boolean ("support-fin-ack", "Support FIN–ACK", "Whether to enable the optional FIN–ACK support.", TRUE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); } static void pseudo_tcp_socket_get_property (GObject *object, guint property_id, GValue *value, GParamSpec *pspec) { PseudoTcpSocket *self = PSEUDO_TCP_SOCKET (object); switch (property_id) { case PROP_CONVERSATION: g_value_set_uint (value, self->priv->conv); break; case PROP_CALLBACKS: g_value_set_pointer (value, (gpointer) &self->priv->callbacks); break; case PROP_STATE: g_value_set_uint (value, self->priv->state); break; case PROP_ACK_DELAY: g_value_set_uint (value, self->priv->ack_delay); break; case PROP_NO_DELAY: g_value_set_boolean (value, !self->priv->use_nagling); break; case PROP_RCV_BUF: g_value_set_uint (value, self->priv->rbuf_len); break; case PROP_SND_BUF: g_value_set_uint (value, self->priv->sbuf_len); break; case PROP_SUPPORT_FIN_ACK: g_value_set_boolean (value, self->priv->support_fin_ack); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; } } static void pseudo_tcp_socket_set_property (GObject *object, guint property_id, const GValue *value, GParamSpec *pspec) { PseudoTcpSocket *self = PSEUDO_TCP_SOCKET (object); switch (property_id) { case PROP_CONVERSATION: self->priv->conv = g_value_get_uint (value); break; case PROP_CALLBACKS: { PseudoTcpCallbacks *c = g_value_get_pointer (value); self->priv->callbacks = *c; } break; case PROP_ACK_DELAY: self->priv->ack_delay = g_value_get_uint (value); break; case PROP_NO_DELAY: self->priv->use_nagling = !g_value_get_boolean (value); break; case PROP_RCV_BUF: g_return_if_fail (self->priv->state == PSEUDO_TCP_LISTEN); resize_receive_buffer (self, g_value_get_uint (value)); break; case PROP_SND_BUF: g_return_if_fail (self->priv->state == PSEUDO_TCP_LISTEN); resize_send_buffer (self, g_value_get_uint (value)); break; case PROP_SUPPORT_FIN_ACK: self->priv->support_fin_ack = g_value_get_boolean (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; } } static void pseudo_tcp_socket_finalize (GObject *object) { PseudoTcpSocket *self = PSEUDO_TCP_SOCKET (object); PseudoTcpSocketPrivate *priv = self->priv; GList *i; SSegment *sseg; if (priv == NULL) return; while ((sseg = g_queue_pop_head (&priv->slist))) g_slice_free (SSegment, sseg); g_queue_clear (&priv->unsent_slist); for (i = priv->rlist; i; i = i->next) { RSegment *rseg = i->data; g_slice_free (RSegment, rseg); } g_list_free (priv->rlist); priv->rlist = NULL; pseudo_tcp_fifo_clear (&priv->rbuf); pseudo_tcp_fifo_clear (&priv->sbuf); g_free (priv); self->priv = NULL; if (G_OBJECT_CLASS (pseudo_tcp_socket_parent_class)->finalize) G_OBJECT_CLASS (pseudo_tcp_socket_parent_class)->finalize (object); } static void pseudo_tcp_socket_init (PseudoTcpSocket *obj) { /* Use g_new0, and do not use g_object_set_private because the size of * our private data is too big (150KB+) and the g_slice_allow cannot allocate * it. So we handle the private ourselves */ PseudoTcpSocketPrivate *priv = g_new0 (PseudoTcpSocketPrivate, 1); obj->priv = priv; priv->shutdown = SD_NONE; priv->error = 0; priv->rbuf_len = DEFAULT_RCV_BUF_SIZE; pseudo_tcp_fifo_init (&priv->rbuf, priv->rbuf_len); priv->sbuf_len = DEFAULT_SND_BUF_SIZE; pseudo_tcp_fifo_init (&priv->sbuf, priv->sbuf_len); priv->state = PSEUDO_TCP_LISTEN; priv->conv = 0; g_queue_init (&priv->slist); g_queue_init (&priv->unsent_slist); priv->rcv_wnd = priv->rbuf_len; priv->rwnd_scale = priv->swnd_scale = 0; priv->snd_nxt = 0; priv->snd_wnd = 1; priv->snd_una = priv->rcv_nxt = 0; priv->bReadEnable = TRUE; priv->bWriteEnable = FALSE; priv->rcv_fin = 0; priv->t_ack = 0; priv->msslevel = 0; priv->largest = 0; priv->mss = MIN_PACKET - PACKET_OVERHEAD; priv->mtu_advise = DEF_MTU; priv->rto_base = 0; priv->cwnd = 2 * priv->mss; priv->ssthresh = priv->rbuf_len; priv->lastrecv = priv->lastsend = priv->last_traffic = 0; priv->bOutgoing = FALSE; priv->dup_acks = 0; priv->recover = 0; priv->last_acked_ts = 0; priv->ts_recent = priv->ts_lastack = 0; priv->rx_rto = DEF_RTO; priv->rx_srtt = priv->rx_rttvar = 0; priv->ack_delay = DEFAULT_ACK_DELAY; priv->use_nagling = !DEFAULT_NO_DELAY; priv->support_wnd_scale = TRUE; priv->support_fin_ack = TRUE; } PseudoTcpSocket *pseudo_tcp_socket_new (guint32 conversation, PseudoTcpCallbacks *callbacks) { return g_object_new (PSEUDO_TCP_SOCKET_TYPE, "conversation", conversation, "callbacks", callbacks, NULL); } static void queue_connect_message (PseudoTcpSocket *self) { PseudoTcpSocketPrivate *priv = self->priv; guint8 buf[8]; gsize size = 0; buf[size++] = CTL_CONNECT; if (priv->support_wnd_scale) { buf[size++] = TCP_OPT_WND_SCALE; buf[size++] = 1; buf[size++] = priv->rwnd_scale; } if (priv->support_fin_ack) { buf[size++] = TCP_OPT_FIN_ACK; buf[size++] = 1; /* option length; zero is invalid (RFC 1122, §4.2.2.5) */ buf[size++] = 0; /* currently unused */ } priv->snd_wnd = size; queue (self, (char *) buf, size, FLAG_CTL); } static void queue_fin_message (PseudoTcpSocket *self) { g_assert (self->priv->support_fin_ack); /* FIN segments are always zero-length. */ queue (self, "", 0, FLAG_FIN); } static void queue_rst_message (PseudoTcpSocket *self) { g_assert (self->priv->support_fin_ack); /* RST segments are always zero-length. */ queue (self, "", 0, FLAG_RST); } gboolean pseudo_tcp_socket_connect(PseudoTcpSocket *self) { PseudoTcpSocketPrivate *priv = self->priv; if (priv->state != PSEUDO_TCP_LISTEN) { priv->error = EINVAL; return FALSE; } set_state (self, PSEUDO_TCP_SYN_SENT); queue_connect_message (self); attempt_send(self, sfNone); return TRUE; } void pseudo_tcp_socket_notify_mtu(PseudoTcpSocket *self, guint16 mtu) { PseudoTcpSocketPrivate *priv = self->priv; priv->mtu_advise = mtu; if (priv->state == PSEUDO_TCP_ESTABLISHED) { adjustMTU(self); } } void pseudo_tcp_socket_notify_clock(PseudoTcpSocket *self) { PseudoTcpSocketPrivate *priv = self->priv; guint32 now = get_current_time (self); if (priv->state == PSEUDO_TCP_CLOSED) return; /* If in the TIME-WAIT state, any delayed segments have passed and the * connection can be considered closed from both ends. * FIXME: This should probably actually compare a timestamp before * operating. */ if (priv->support_fin_ack && priv->state == PSEUDO_TCP_TIME_WAIT) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Notified clock in TIME-WAIT state; closing connection."); set_state_closed (self, 0); } /* If in the LAST-ACK state, resend the FIN because it hasn’t been ACKed yet. * FIXME: This should probably actually compare a timestamp before * operating. */ if (priv->support_fin_ack && priv->state == PSEUDO_TCP_LAST_ACK) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Notified clock in LAST-ACK state; resending FIN segment."); queue_fin_message (self); attempt_send (self, sfFin); } // Check if it's time to retransmit a segment if (priv->rto_base && (time_diff(priv->rto_base + priv->rx_rto, now) <= 0)) { if (g_queue_get_length (&priv->slist) == 0) { g_assert_not_reached (); } else { // Note: (priv->slist.front().xmit == 0)) { // retransmit segments guint32 nInFlight; guint32 rto_limit; int transmit_status; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "timeout retransmit (rto: %u) " "(rto_base: %u) (now: %u) (dup_acks: %u)", priv->rx_rto, priv->rto_base, now, (guint) priv->dup_acks); transmit_status = transmit(self, g_queue_peek_head (&priv->slist), now); if (transmit_status != 0) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Error transmitting segment. Closing down."); closedown (self, transmit_status, CLOSEDOWN_LOCAL); return; } nInFlight = priv->snd_nxt - priv->snd_una; priv->ssthresh = max(nInFlight / 2, 2 * priv->mss); DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "ssthresh: %u = (nInFlight: %u / 2) + " "2 * mss: %u", priv->ssthresh, nInFlight, priv->mss); //LOG(LS_INFO) << "priv->ssthresh: " << priv->ssthresh << " nInFlight: " << nInFlight << " priv->mss: " << priv->mss; priv->cwnd = priv->mss; // Back off retransmit timer. Note: the limit is lower when connecting. rto_limit = (priv->state < PSEUDO_TCP_ESTABLISHED) ? DEF_RTO : MAX_RTO; priv->rx_rto = min(rto_limit, priv->rx_rto * 2); priv->rto_base = now; priv->recover = priv->snd_nxt; if (priv->dup_acks >= 3) { priv->dup_acks = 0; priv->fast_recovery = FALSE; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "exit recovery on timeout"); } } } // Check if it's time to probe closed windows if ((priv->snd_wnd == 0) && (time_diff(priv->lastsend + priv->rx_rto, now) <= 0)) { if (time_diff(now, priv->lastrecv) >= 15000) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Receive window closed. Closing down."); closedown (self, ECONNABORTED, CLOSEDOWN_LOCAL); return; } // probe the window packet(self, priv->snd_nxt - 1, 0, 0, 0, now); priv->lastsend = now; // back off retransmit timer priv->rx_rto = min(MAX_RTO, priv->rx_rto * 2); } // Check if it's time to send delayed acks if (priv->t_ack && (time_diff(priv->t_ack + priv->ack_delay, now) <= 0)) { packet(self, priv->snd_nxt, 0, 0, 0, now); } } gboolean pseudo_tcp_socket_notify_packet(PseudoTcpSocket *self, const gchar * buffer, guint32 len) { gboolean retval; if (len > MAX_PACKET) { //LOG_F(WARNING) << "packet too large"; self->priv->error = EMSGSIZE; return FALSE; } else if (len < HEADER_SIZE) { //LOG_F(WARNING) << "packet too small"; self->priv->error = EINVAL; return FALSE; } /* Hold a reference to the PseudoTcpSocket during parsing, since it may be * closed from within a callback. */ g_object_ref (self); retval = parse (self, (guint8 *) buffer, HEADER_SIZE, (guint8 *) buffer + HEADER_SIZE, len - HEADER_SIZE); g_object_unref (self); return retval; } /* Assume there are two buffers in the given #NiceInputMessage: a 24-byte one * containing the header, and a bigger one for the data. */ gboolean pseudo_tcp_socket_notify_message (PseudoTcpSocket *self, NiceInputMessage *message) { gboolean retval; g_assert_cmpuint (message->n_buffers, >, 0); if (message->n_buffers == 1) return pseudo_tcp_socket_notify_packet (self, message->buffers[0].buffer, message->buffers[0].size); g_assert_cmpuint (message->n_buffers, ==, 2); g_assert_cmpuint (message->buffers[0].size, ==, HEADER_SIZE); if (message->length > MAX_PACKET) { //LOG_F(WARNING) << "packet too large"; return FALSE; } else if (message->length < HEADER_SIZE) { //LOG_F(WARNING) << "packet too small"; return FALSE; } /* Hold a reference to the PseudoTcpSocket during parsing, since it may be * closed from within a callback. */ g_object_ref (self); retval = parse (self, message->buffers[0].buffer, message->buffers[0].size, message->buffers[1].buffer, message->length - message->buffers[0].size); g_object_unref (self); return retval; } gboolean pseudo_tcp_socket_get_next_clock(PseudoTcpSocket *self, guint64 *timeout) { PseudoTcpSocketPrivate *priv = self->priv; guint32 now = get_current_time (self); gsize snd_buffered; guint32 closed_timeout; if (priv->shutdown == SD_FORCEFUL) { if (priv->support_fin_ack) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "‘Forceful’ shutdown used when FIN-ACK support is enabled"); } /* Transition to the CLOSED state. */ closedown (self, 0, CLOSEDOWN_REMOTE); return FALSE; } snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf); if ((priv->shutdown == SD_GRACEFUL) && ((priv->state != PSEUDO_TCP_ESTABLISHED) || ((snd_buffered == 0) && (priv->t_ack == 0)))) { if (priv->support_fin_ack) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "‘Graceful’ shutdown used when FIN-ACK support is enabled"); } /* Transition to the CLOSED state. */ closedown (self, 0, CLOSEDOWN_REMOTE); return FALSE; } /* FIN-ACK support. The timeout for closing the socket if nothing is received * varies depending on whether the socket is waiting in the TIME-WAIT state * for delayed segments to pass. * * See: http://vincent.bernat.im/en/blog/2014-tcp-time-wait-state-linux.html */ closed_timeout = CLOSED_TIMEOUT; if (priv->support_fin_ack && priv->state == PSEUDO_TCP_TIME_WAIT) closed_timeout = TIME_WAIT_TIMEOUT; if (priv->support_fin_ack && priv->state == PSEUDO_TCP_CLOSED) { return FALSE; } if (*timeout == 0 || *timeout < now) *timeout = now + closed_timeout; if (priv->support_fin_ack && priv->state == PSEUDO_TCP_TIME_WAIT) { *timeout = min (*timeout, now + TIME_WAIT_TIMEOUT); return TRUE; } if (priv->state == PSEUDO_TCP_CLOSED && !priv->support_fin_ack) { *timeout = min (*timeout, now + CLOSED_TIMEOUT); return TRUE; } *timeout = min (*timeout, now + DEFAULT_TIMEOUT); if (priv->t_ack) { *timeout = min(*timeout, priv->t_ack + priv->ack_delay); } if (priv->rto_base) { *timeout = min(*timeout, priv->rto_base + priv->rx_rto); } if (priv->snd_wnd == 0) { *timeout = min(*timeout, priv->lastsend + priv->rx_rto); } return TRUE; } gint pseudo_tcp_socket_recv(PseudoTcpSocket *self, char * buffer, size_t len) { PseudoTcpSocketPrivate *priv = self->priv; gsize bytesread; gsize available_space; /* Received a FIN from the peer, so return 0. RFC 793, §3.5, Case 2. */ if (priv->support_fin_ack && priv->shutdown_reads) { return 0; } /* Return 0 if FIN-ACK is not supported but the socket has been closed. */ if (!priv->support_fin_ack && pseudo_tcp_socket_is_closed (self)) { return 0; } /* Return ENOTCONN if FIN-ACK is not supported and the connection is not * ESTABLISHED. */ if (!priv->support_fin_ack && priv->state != PSEUDO_TCP_ESTABLISHED) { priv->error = ENOTCONN; return -1; } if (len == 0) return 0; bytesread = pseudo_tcp_fifo_read (&priv->rbuf, (guint8 *) buffer, len); // If there's no data in |m_rbuf|. if (bytesread == 0 && !(pseudo_tcp_state_has_received_fin (priv->state) || pseudo_tcp_state_has_received_fin_ack (priv->state))) { priv->bReadEnable = TRUE; priv->error = EWOULDBLOCK; return -1; } available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf); if (available_space - priv->rcv_wnd >= min (priv->rbuf_len / 2, priv->mss)) { // !?! Not sure about this was closed business gboolean bWasClosed = (priv->rcv_wnd == 0); priv->rcv_wnd = available_space; if (bWasClosed) { attempt_send(self, sfImmediateAck); } } return bytesread; } gint pseudo_tcp_socket_send(PseudoTcpSocket *self, const char * buffer, guint32 len) { PseudoTcpSocketPrivate *priv = self->priv; gint written; gsize available_space; if (priv->state != PSEUDO_TCP_ESTABLISHED) { priv->error = pseudo_tcp_state_has_sent_fin (priv->state) ? EPIPE : ENOTCONN; return -1; } available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf); if (!available_space) { priv->bWriteEnable = TRUE; priv->error = EWOULDBLOCK; return -1; } written = queue (self, buffer, len, FLAG_NONE); attempt_send(self, sfNone); if (written > 0 && (guint32)written < len) { priv->bWriteEnable = TRUE; } return written; } void pseudo_tcp_socket_close(PseudoTcpSocket *self, gboolean force) { PseudoTcpSocketPrivate *priv = self->priv; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Closing socket %p %s", self, force ? "forcefully" : "gracefully"); /* Forced closure by sending an RST segment. RFC 1122, §4.2.2.13. */ if (force && priv->state != PSEUDO_TCP_CLOSED) { closedown (self, ECONNABORTED, CLOSEDOWN_LOCAL); return; } /* Fall back to shutdown(). */ pseudo_tcp_socket_shutdown (self, PSEUDO_TCP_SHUTDOWN_RDWR); } void pseudo_tcp_socket_shutdown (PseudoTcpSocket *self, PseudoTcpShutdown how) { PseudoTcpSocketPrivate *priv = self->priv; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Shutting down socket %p: %u", self, how); /* FIN-ACK--only stuff below here. */ if (!priv->support_fin_ack) { if (priv->shutdown == SD_NONE) priv->shutdown = SD_GRACEFUL; return; } /* What needs shutting down? */ switch (how) { case PSEUDO_TCP_SHUTDOWN_RD: case PSEUDO_TCP_SHUTDOWN_RDWR: priv->shutdown_reads = TRUE; break; case PSEUDO_TCP_SHUTDOWN_WR: /* Handled below. */ break; default: DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid shutdown method: %u.", how); break; } if (how == PSEUDO_TCP_SHUTDOWN_RD) { return; } /* Unforced write closure. */ switch (priv->state) { case PSEUDO_TCP_LISTEN: case PSEUDO_TCP_SYN_SENT: /* Just abort the connection without completing the handshake. */ set_state_closed (self, 0); break; case PSEUDO_TCP_SYN_RECEIVED: case PSEUDO_TCP_ESTABLISHED: /* Local user initiating the close: RFC 793, §3.5, Cases 1 and 3. * If there is pending receive data, send RST instead of FIN; * see RFC 1122, §4.2.2.13. */ if (pseudo_tcp_socket_get_available_bytes (self) > 0) { closedown (self, ECONNABORTED, CLOSEDOWN_LOCAL); } else { queue_fin_message (self); attempt_send (self, sfFin); set_state (self, PSEUDO_TCP_FIN_WAIT_1); } break; case PSEUDO_TCP_CLOSE_WAIT: /* Remote user initiating the close: RFC 793, §3.5, Case 2. * We’ve previously received a FIN from the peer; now the user is closing * the local end of the connection. */ queue_fin_message (self); attempt_send (self, sfFin); set_state (self, PSEUDO_TCP_LAST_ACK); break; case PSEUDO_TCP_CLOSING: case PSEUDO_TCP_CLOSED: /* Already closed on both sides. */ break; case PSEUDO_TCP_FIN_WAIT_1: case PSEUDO_TCP_FIN_WAIT_2: case PSEUDO_TCP_TIME_WAIT: case PSEUDO_TCP_LAST_ACK: /* Already closed locally. */ break; default: /* Do nothing. */ break; } } int pseudo_tcp_socket_get_error(PseudoTcpSocket *self) { PseudoTcpSocketPrivate *priv = self->priv; return priv->error; } // // Internal Implementation // static guint32 queue (PseudoTcpSocket *self, const gchar * data, guint32 len, TcpFlags flags) { PseudoTcpSocketPrivate *priv = self->priv; gsize available_space; available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf); if (len > available_space) { g_assert_cmpint (flags, ==, FLAG_NONE); len = available_space; } // We can concatenate data if the last segment is the same type // (control v. regular data), and has not been transmitted yet if (g_queue_get_length (&priv->slist) && (((SSegment *)g_queue_peek_tail (&priv->slist))->flags == flags) && (((SSegment *)g_queue_peek_tail (&priv->slist))->xmit == 0)) { ((SSegment *)g_queue_peek_tail (&priv->slist))->len += len; } else { SSegment *sseg = g_slice_new0 (SSegment); gsize snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf); sseg->seq = priv->snd_una + snd_buffered; sseg->len = len; sseg->flags = flags; g_queue_push_tail (&priv->slist, sseg); g_queue_push_tail (&priv->unsent_slist, sseg); } //LOG(LS_INFO) << "PseudoTcp::queue - priv->slen = " << priv->slen; return pseudo_tcp_fifo_write (&priv->sbuf, (guint8*) data, len);; } // Creates a packet and submits it to the network. This method can either // send payload or just an ACK packet. // // |seq| is the sequence number of this packet. // |flags| is the flags for sending this packet. // |offset| is the offset to read from |m_sbuf|. // |len| is the number of bytes to read from |m_sbuf| as payload. If this // value is 0 then this is an ACK packet, otherwise this packet has payload. static PseudoTcpWriteResult packet(PseudoTcpSocket *self, guint32 seq, TcpFlags flags, guint32 offset, guint32 len, guint32 now) { PseudoTcpSocketPrivate *priv = self->priv; union { guint8 u8[MAX_PACKET]; guint16 u16[MAX_PACKET / 2]; guint32 u32[MAX_PACKET / 4]; } buffer; PseudoTcpWriteResult wres = WR_SUCCESS; g_assert_cmpuint (HEADER_SIZE + len, <=, MAX_PACKET); *buffer.u32 = htonl(priv->conv); *(buffer.u32 + 1) = htonl(seq); *(buffer.u32 + 2) = htonl(priv->rcv_nxt); buffer.u8[12] = 0; buffer.u8[13] = flags; *(buffer.u16 + 7) = htons((guint16)(priv->rcv_wnd >> priv->rwnd_scale)); // Timestamp computations *(buffer.u32 + 4) = htonl(now); *(buffer.u32 + 5) = htonl(priv->ts_recent); priv->ts_lastack = priv->rcv_nxt; if (len) { gsize bytes_read; bytes_read = pseudo_tcp_fifo_read_offset (&priv->sbuf, buffer.u8 + HEADER_SIZE, len, offset); g_assert_cmpint (bytes_read, ==, len); } DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "Sending " "", priv->conv, (unsigned)flags, seq, seq + len, priv->rcv_nxt, priv->rcv_wnd, now % 10000, priv->ts_recent % 10000, len); wres = priv->callbacks.WritePacket(self, (gchar *) buffer.u8, len + HEADER_SIZE, priv->callbacks.user_data); /* Note: When len is 0, this is an ACK packet. We don't read the return value for those, and thus we won't retry. So go ahead and treat the packet as a success (basically simulate as if it were dropped), which will prevent our timers from being messed up. */ if ((wres != WR_SUCCESS) && (0 != len)) return wres; priv->t_ack = 0; if (len > 0) { priv->lastsend = now; } priv->last_traffic = now; priv->bOutgoing = TRUE; return WR_SUCCESS; } static gboolean parse (PseudoTcpSocket *self, const guint8 *_header_buf, gsize header_buf_len, const guint8 *data_buf, gsize data_buf_len) { Segment seg; union { const guint8 *u8; const guint16 *u16; const guint32 *u32; } header_buf; header_buf.u8 = _header_buf; if (header_buf_len != 24) return FALSE; seg.conv = ntohl(*header_buf.u32); seg.seq = ntohl(*(header_buf.u32 + 1)); seg.ack = ntohl(*(header_buf.u32 + 2)); seg.flags = header_buf.u8[13]; seg.wnd = ntohs(*(header_buf.u16 + 7)); seg.tsval = ntohl(*(header_buf.u32 + 4)); seg.tsecr = ntohl(*(header_buf.u32 + 5)); seg.data = (const gchar *) data_buf; seg.len = data_buf_len; DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "Received " "", seg.conv, (unsigned)seg.flags, seg.seq, seg.seq + seg.len, seg.ack, seg.wnd, seg.tsval % 10000, seg.tsecr % 10000, seg.len); return process(self, &seg); } /* True iff the @state requires that a FIN has already been sent by this * host. */ static gboolean pseudo_tcp_state_has_sent_fin (PseudoTcpState state) { switch (state) { case PSEUDO_TCP_LISTEN: case PSEUDO_TCP_SYN_SENT: case PSEUDO_TCP_SYN_RECEIVED: case PSEUDO_TCP_ESTABLISHED: case PSEUDO_TCP_CLOSE_WAIT: return FALSE; case PSEUDO_TCP_CLOSED: case PSEUDO_TCP_FIN_WAIT_1: case PSEUDO_TCP_FIN_WAIT_2: case PSEUDO_TCP_CLOSING: case PSEUDO_TCP_TIME_WAIT: case PSEUDO_TCP_LAST_ACK: return TRUE; default: return FALSE; } } /* True iff the @state requires that a FIN has already been received from the * peer. */ static gboolean pseudo_tcp_state_has_received_fin (PseudoTcpState state) { switch (state) { case PSEUDO_TCP_LISTEN: case PSEUDO_TCP_SYN_SENT: case PSEUDO_TCP_SYN_RECEIVED: case PSEUDO_TCP_ESTABLISHED: case PSEUDO_TCP_FIN_WAIT_1: case PSEUDO_TCP_FIN_WAIT_2: return FALSE; case PSEUDO_TCP_CLOSED: case PSEUDO_TCP_CLOSING: case PSEUDO_TCP_TIME_WAIT: case PSEUDO_TCP_CLOSE_WAIT: case PSEUDO_TCP_LAST_ACK: return TRUE; default: return FALSE; } } /* True iff the @state requires that a FIN-ACK has already been received from * the peer. */ static gboolean pseudo_tcp_state_has_received_fin_ack (PseudoTcpState state) { switch (state) { case PSEUDO_TCP_LISTEN: case PSEUDO_TCP_SYN_SENT: case PSEUDO_TCP_SYN_RECEIVED: case PSEUDO_TCP_ESTABLISHED: case PSEUDO_TCP_FIN_WAIT_1: case PSEUDO_TCP_FIN_WAIT_2: case PSEUDO_TCP_CLOSING: case PSEUDO_TCP_CLOSE_WAIT: case PSEUDO_TCP_LAST_ACK: return FALSE; case PSEUDO_TCP_CLOSED: case PSEUDO_TCP_TIME_WAIT: return TRUE; default: return FALSE; } } static gboolean process(PseudoTcpSocket *self, Segment *seg) { PseudoTcpSocketPrivate *priv = self->priv; guint32 now; SendFlags sflags = sfNone; gboolean bIgnoreData; gboolean bNewData; gboolean bConnect = FALSE; gsize snd_buffered; gsize available_space; guint32 kIdealRefillSize; gboolean is_valuable_ack, is_duplicate_ack, is_fin_ack = FALSE; gboolean received_fin = FALSE; /* If this is the wrong conversation, send a reset!?! (with the correct conversation?) */ if (seg->conv != priv->conv) { //if ((seg->flags & FLAG_RST) == 0) { // packet(sock, tcb, seg->ack, 0, FLAG_RST, 0, 0); //} DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "wrong conversation"); return FALSE; } now = get_current_time (self); priv->last_traffic = priv->lastrecv = now; priv->bOutgoing = FALSE; if (priv->state == PSEUDO_TCP_CLOSED || (pseudo_tcp_state_has_received_fin_ack (priv->state) && seg->len > 0)) { /* Send an RST segment. See: RFC 1122, §4.2.2.13; RFC 793, §3.4, point 3, * page 37. We can only send RST if we know the peer knows we’re closed; * otherwise this could be a timeout retransmit from them, due to our * packets from data through to FIN being dropped. */ DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Segment received while closed; sending RST."); if ((seg->flags & FLAG_RST) == 0) { closedown (self, 0, CLOSEDOWN_LOCAL); } return FALSE; } // Check if this is a reset segment if (seg->flags & FLAG_RST) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Received RST segment; closing down."); closedown (self, ECONNRESET, CLOSEDOWN_REMOTE); return FALSE; } // Check for control data bConnect = FALSE; if (seg->flags & FLAG_CTL) { if (seg->len == 0) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Missing control code"); return FALSE; } else if (seg->data[0] == CTL_CONNECT) { bConnect = TRUE; parse_options (self, (guint8 *) &seg->data[1], seg->len - 1); if (priv->state == PSEUDO_TCP_LISTEN) { set_state (self, PSEUDO_TCP_SYN_RECEIVED); queue_connect_message (self); } else if (priv->state == PSEUDO_TCP_SYN_SENT) { set_state_established (self); } } else { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Unknown control code: %u", seg->data[0]); return FALSE; } } // Update timestamp if (SMALLER_OR_EQUAL (seg->seq, priv->ts_lastack) && SMALLER (priv->ts_lastack, seg->seq + seg->len)) { priv->ts_recent = seg->tsval; } // Check if this is a valuable ack is_valuable_ack = (LARGER(seg->ack, priv->snd_una) && SMALLER_OR_EQUAL(seg->ack, priv->snd_nxt)); is_duplicate_ack = (seg->ack == priv->snd_una); if (is_valuable_ack) { guint32 nAcked; guint32 nFree; // Calculate round-trip time if (seg->tsecr) { long rtt = time_diff(now, seg->tsecr); if (rtt >= 0) { if (priv->rx_srtt == 0) { priv->rx_srtt = rtt; priv->rx_rttvar = rtt / 2; } else { priv->rx_rttvar = (3 * priv->rx_rttvar + labs((long)(rtt - priv->rx_srtt))) / 4; priv->rx_srtt = (7 * priv->rx_srtt + rtt) / 8; } priv->rx_rto = bound(MIN_RTO, priv->rx_srtt + max(1LU, 4 * priv->rx_rttvar), MAX_RTO); DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "rtt: %ld srtt: %u rttvar: %u rto: %u", rtt, priv->rx_srtt, priv->rx_rttvar, priv->rx_rto); } else { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid RTT: %ld", rtt); return FALSE; } priv->last_acked_ts = seg->tsecr; } priv->snd_wnd = seg->wnd << priv->swnd_scale; nAcked = seg->ack - priv->snd_una; priv->snd_una = seg->ack; priv->rto_base = (priv->snd_una == priv->snd_nxt) ? 0 : now; /* ACKs for FIN segments give an increment on nAcked, but there is no * corresponding byte to read because the FIN segment is empty (it just has * a sequence number). */ if (nAcked == priv->sbuf.data_length + 1 && pseudo_tcp_state_has_sent_fin (priv->state)) { is_fin_ack = TRUE; nAcked--; } pseudo_tcp_fifo_consume_read_data (&priv->sbuf, nAcked); for (nFree = nAcked; nFree > 0; ) { SSegment *data; g_assert_cmpuint (g_queue_get_length (&priv->slist), !=, 0); data = (SSegment *) g_queue_peek_head (&priv->slist); if (nFree < data->len) { data->len -= nFree; data->seq += nFree; nFree = 0; } else { if (data->len > priv->largest) { priv->largest = data->len; } nFree -= data->len; g_slice_free (SSegment, data); g_queue_pop_head (&priv->slist); } } if (priv->dup_acks >= 3) { if (LARGER_OR_EQUAL (priv->snd_una, priv->recover)) { // NewReno guint32 nInFlight = priv->snd_nxt - priv->snd_una; // (Fast Retransmit) priv->cwnd = min(priv->ssthresh, max (nInFlight, priv->mss) + priv->mss); DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "exit recovery cwnd=%d ssthresh=%d nInFlight=%d mss: %d", priv->cwnd, priv->ssthresh, nInFlight, priv->mss); priv->fast_recovery = FALSE; priv->dup_acks = 0; } else { int transmit_status; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "recovery retransmit"); transmit_status = transmit(self, g_queue_peek_head (&priv->slist), now); if (transmit_status != 0) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Error transmitting recovery retransmit segment. Closing down."); closedown (self, transmit_status, CLOSEDOWN_LOCAL); return FALSE; } priv->cwnd += (nAcked > priv->mss ? priv->mss : 0) - min(nAcked, priv->cwnd); } } else { priv->dup_acks = 0; // Slow start, congestion avoidance if (priv->cwnd < priv->ssthresh) { priv->cwnd += priv->mss; } else { priv->cwnd += max(1LU, priv->mss * priv->mss / priv->cwnd); } } } else if (is_duplicate_ack) { /* !?! Note, tcp says don't do this... but otherwise how does a closed window become open? */ priv->snd_wnd = seg->wnd << priv->swnd_scale; // Check duplicate acks if (seg->len > 0) { // it's a dup ack, but with a data payload, so don't modify priv->dup_acks } else if (priv->snd_una != priv->snd_nxt) { guint32 nInFlight; priv->dup_acks += 1; DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "Received dup ack (dups: %u)", priv->dup_acks); if (priv->dup_acks == 3) { // (Fast Retransmit) int transmit_status; if (LARGER_OR_EQUAL (priv->snd_una, priv->recover) || seg->tsecr == priv->last_acked_ts) { /* NewReno */ /* Invoke fast retransmit RFC3782 section 3 step 1A*/ DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "enter recovery"); DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "recovery retransmit"); transmit_status = transmit(self, g_queue_peek_head (&priv->slist), now); if (transmit_status != 0) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Error transmitting recovery retransmit segment. Closing down."); closedown (self, transmit_status, CLOSEDOWN_LOCAL); return FALSE; } priv->recover = priv->snd_nxt; nInFlight = priv->snd_nxt - priv->snd_una; priv->ssthresh = max(nInFlight / 2, 2 * priv->mss); DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "ssthresh: %u = max((nInFlight: %u / 2), 2 * mss: %u)", priv->ssthresh, nInFlight, priv->mss); priv->cwnd = priv->ssthresh + 3 * priv->mss; priv->fast_recovery = TRUE; } else { DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "Skipping fast recovery: recover: %u snd_una: %u", priv->recover, priv->snd_una); } } else if (priv->dup_acks > 3) { if (priv->fast_recovery) priv->cwnd += priv->mss; } } else { priv->dup_acks = 0; } } // !?! A bit hacky if ((priv->state == PSEUDO_TCP_SYN_RECEIVED) && !bConnect) { set_state_established (self); } /* Check for connection closure. Only pay attention to FIN segments if they * are in sequence; otherwise we’ve missed a packet earlier in the stream and * need to request retransmission first. */ if (priv->support_fin_ack) { /* @received_fin is set when, and only when, all segments preceding the FIN * have been acknowledged. This is to handle the case where the FIN arrives * out of order with a preceding data segment. */ if (seg->flags & FLAG_FIN) { priv->rcv_fin = seg->seq; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Setting rcv_fin = %u", priv->rcv_fin); } /* For the moment, FIN segments must not contain data. */ if (seg->flags & FLAG_FIN && seg->len != 0) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "FIN segment contained data; ignored"); return FALSE; } received_fin = (priv->rcv_nxt != 0 && priv->rcv_nxt + seg->len == priv->rcv_fin); /* Update the state machine, implementing all transitions on ‘rcv FIN’ or * ‘rcv ACK of FIN’ from RFC 793, Figure 6; and RFC 1122, §4.2.2.8. */ switch (priv->state) { case PSEUDO_TCP_ESTABLISHED: if (received_fin) { /* Received a FIN from the network, RFC 793, §3.5, Case 2. * The code below will send an ACK for the FIN. */ set_state (self, PSEUDO_TCP_CLOSE_WAIT); } break; case PSEUDO_TCP_CLOSING: if (is_fin_ack) { /* Handle the ACK of a locally-sent FIN flag. RFC 793, §3.5, Case 3. */ set_state (self, PSEUDO_TCP_TIME_WAIT); } break; case PSEUDO_TCP_LAST_ACK: if (is_fin_ack) { /* Handle the ACK of a locally-sent FIN flag. RFC 793, §3.5, Case 2. */ set_state_closed (self, 0); } break; case PSEUDO_TCP_FIN_WAIT_1: if (is_fin_ack && received_fin) { /* Simultaneous close with an ACK for a FIN previously sent, * RFC 793, §3.5, Case 3. */ set_state (self, PSEUDO_TCP_TIME_WAIT); } else if (is_fin_ack) { /* Handle the ACK of a locally-sent FIN flag. RFC 793, §3.5, Case 1. */ set_state (self, PSEUDO_TCP_FIN_WAIT_2); } else if (received_fin) { /* Simultaneous close, RFC 793, §3.5, Case 3. */ set_state (self, PSEUDO_TCP_CLOSING); } break; case PSEUDO_TCP_FIN_WAIT_2: if (received_fin) { /* Local user closed the connection, RFC 793, §3.5, Case 1. */ set_state (self, PSEUDO_TCP_TIME_WAIT); } break; case PSEUDO_TCP_LISTEN: case PSEUDO_TCP_SYN_SENT: case PSEUDO_TCP_SYN_RECEIVED: case PSEUDO_TCP_TIME_WAIT: case PSEUDO_TCP_CLOSED: case PSEUDO_TCP_CLOSE_WAIT: /* Shouldn’t ever hit these cases. */ if (received_fin) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Unexpected state %u when FIN received", priv->state); } else if (is_fin_ack) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Unexpected state %u when FIN-ACK received", priv->state); } break; default: DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid state %u when FIN received", priv->state); return FALSE; } } else if (seg->flags & FLAG_FIN) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid FIN received when FIN-ACK support is disabled"); } else if (is_fin_ack) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid FIN-ACK received when FIN-ACK support is disabled"); } // If we make room in the send queue, notify the user // The goal it to make sure we always have at least enough data to fill the // window. We'd like to notify the app when we are halfway to that point. kIdealRefillSize = (priv->sbuf_len + priv->rbuf_len) / 2; snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf); if (priv->bWriteEnable && snd_buffered < kIdealRefillSize) { priv->bWriteEnable = FALSE; if (priv->callbacks.PseudoTcpWritable) priv->callbacks.PseudoTcpWritable(self, priv->callbacks.user_data); } /* Conditions where acks must be sent: * 1) Segment is too old (they missed an ACK) (immediately) * 2) Segment is too new (we missed a segment) (immediately) * 3) Segment has data (so we need to ACK!) (delayed) * ... so the only time we don't need to ACK, is an empty segment * that points to rcv_nxt! * 4) Segment has the FIN flag set (immediately) — note that the FIN flag * itself has to be included in the ACK as a numbered byte; * see RFC 793, §3.3. Also see: RFC 793, §3.5. */ if (seg->seq != priv->rcv_nxt) { sflags = sfDuplicateAck; // (Fast Recovery) } else if (seg->len != 0) { if (priv->ack_delay == 0) { sflags = sfImmediateAck; } else { sflags = sfDelayedAck; } } else if (received_fin) { /* FIN flags have a sequence number. Only acknowledge them after all * preceding octets have been acknowledged. */ sflags = sfImmediateAck; } if (sflags == sfDuplicateAck) { if (seg->seq > priv->rcv_nxt) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "too new"); } else if (SMALLER_OR_EQUAL(seg->seq + seg->len, priv->rcv_nxt)) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "too old"); } } // Adjust the incoming segment to fit our receive buffer if (SMALLER(seg->seq, priv->rcv_nxt)) { guint32 nAdjust = priv->rcv_nxt - seg->seq; if (nAdjust < seg->len) { seg->seq += nAdjust; seg->data += nAdjust; seg->len -= nAdjust; } else { seg->len = 0; } } available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf); if ((seg->seq + seg->len - priv->rcv_nxt) > available_space) { guint32 nAdjust = seg->seq + seg->len - priv->rcv_nxt - available_space; if (nAdjust < seg->len) { seg->len -= nAdjust; } else { seg->len = 0; } } bIgnoreData = (seg->flags & FLAG_CTL); if (!priv->support_fin_ack) bIgnoreData |= (priv->shutdown != SD_NONE); bNewData = FALSE; if (seg->len > 0) { if (bIgnoreData) { if (seg->seq == priv->rcv_nxt) { priv->rcv_nxt += seg->len; } } else { guint32 nOffset = seg->seq - priv->rcv_nxt; gsize res; res = pseudo_tcp_fifo_write_offset (&priv->rbuf, (guint8 *) seg->data, seg->len, nOffset); g_assert_cmpint (res, ==, seg->len); if (seg->seq == priv->rcv_nxt) { GList *iter = NULL; pseudo_tcp_fifo_consume_write_buffer (&priv->rbuf, seg->len); priv->rcv_nxt += seg->len; priv->rcv_wnd -= seg->len; bNewData = TRUE; iter = priv->rlist; while (iter && SMALLER_OR_EQUAL(((RSegment *)iter->data)->seq, priv->rcv_nxt)) { RSegment *data = (RSegment *)(iter->data); if (LARGER (data->seq + data->len, priv->rcv_nxt)) { guint32 nAdjust = (data->seq + data->len) - priv->rcv_nxt; sflags = sfImmediateAck; // (Fast Recovery) DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Recovered %u bytes (%u -> %u)", nAdjust, priv->rcv_nxt, priv->rcv_nxt + nAdjust); pseudo_tcp_fifo_consume_write_buffer (&priv->rbuf, nAdjust); priv->rcv_nxt += nAdjust; priv->rcv_wnd -= nAdjust; } g_slice_free (RSegment, priv->rlist->data); priv->rlist = g_list_delete_link (priv->rlist, priv->rlist); iter = priv->rlist; } } else { GList *iter = NULL; RSegment *rseg = g_slice_new0 (RSegment); DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Saving %u bytes (%u -> %u)", seg->len, seg->seq, seg->seq + seg->len); rseg->seq = seg->seq; rseg->len = seg->len; iter = priv->rlist; while (iter && SMALLER (((RSegment*)iter->data)->seq, rseg->seq)) { iter = g_list_next (iter); } priv->rlist = g_list_insert_before(priv->rlist, iter, rseg); } } } if (received_fin) { /* FIN flags have a sequence number. */ priv->rcv_nxt++; } attempt_send(self, sflags); // If we have new data, notify the user if (bNewData && priv->bReadEnable) { /* priv->bReadEnable = FALSE; — removed so that we’re always notified of * incoming pseudo-TCP data, rather than having to read the entire buffer * on each readable() callback before the next callback is enabled. * (When client-provided buffers are small, this is not possible.) */ if (priv->callbacks.PseudoTcpReadable) priv->callbacks.PseudoTcpReadable(self, priv->callbacks.user_data); } return TRUE; } static gboolean transmit(PseudoTcpSocket *self, SSegment *segment, guint32 now) { PseudoTcpSocketPrivate *priv = self->priv; guint32 nTransmit = min(segment->len, priv->mss); if (segment->xmit >= ((priv->state == PSEUDO_TCP_ESTABLISHED) ? 15 : 30)) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "too many retransmits"); return ETIMEDOUT; } while (TRUE) { guint32 seq = segment->seq; guint8 flags = segment->flags; PseudoTcpWriteResult wres; /* The packet must not have already been acknowledged. */ g_assert_cmpuint (segment->seq - priv->snd_una, <=, 1024 * 1024 * 64); /* Write out the packet. */ wres = packet(self, seq, flags, segment->seq - priv->snd_una, nTransmit, now); if (wres == WR_SUCCESS) break; if (wres == WR_FAIL) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "packet failed"); return ECONNABORTED; /* FIXME: This error code doesn’t quite seem right */ } g_assert_cmpint (wres, ==, WR_TOO_LARGE); while (TRUE) { if (PACKET_MAXIMUMS[priv->msslevel + 1] == 0) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "MTU too small"); return EMSGSIZE; } /* !?! We need to break up all outstanding and pending packets and then retransmit!?! */ priv->mss = PACKET_MAXIMUMS[++priv->msslevel] - PACKET_OVERHEAD; // I added this... haven't researched actual formula priv->cwnd = 2 * priv->mss; if (priv->mss < nTransmit) { nTransmit = priv->mss; break; } } DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Adjusting mss to %u bytes ", priv->mss); } if (nTransmit < segment->len) { SSegment *subseg = g_slice_new0 (SSegment); subseg->seq = segment->seq + nTransmit; subseg->len = segment->len - nTransmit; subseg->flags = segment->flags; subseg->xmit = segment->xmit; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "mss reduced to %u", priv->mss); segment->len = nTransmit; g_queue_insert_after (&priv->slist, g_queue_find (&priv->slist, segment), subseg); if (subseg->xmit == 0) g_queue_insert_after (&priv->unsent_slist, g_queue_find (&priv->unsent_slist, segment), subseg); } if (segment->xmit == 0) { g_assert (g_queue_peek_head (&priv->unsent_slist) == segment); g_queue_pop_head (&priv->unsent_slist); priv->snd_nxt += segment->len; /* FIN flags require acknowledgement. */ if (segment->len == 0 && segment->flags & FLAG_FIN) priv->snd_nxt++; } segment->xmit += 1; if (priv->rto_base == 0) { priv->rto_base = now; } return 0; } static void attempt_send(PseudoTcpSocket *self, SendFlags sflags) { PseudoTcpSocketPrivate *priv = self->priv; guint32 now = get_current_time (self); gboolean bFirst = TRUE; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Attempting send with flags %u.", sflags); if (time_diff(now, priv->lastsend) > (long) priv->rx_rto) { priv->cwnd = priv->mss; } while (TRUE) { guint32 cwnd; guint32 nWindow; guint32 nInFlight; guint32 nUseable; guint32 nAvailable; gsize snd_buffered; GList *iter; SSegment *sseg; int transmit_status; cwnd = priv->cwnd; if ((priv->dup_acks == 1) || (priv->dup_acks == 2)) { // Limited Transmit cwnd += priv->dup_acks * priv->mss; } nWindow = min(priv->snd_wnd, cwnd); nInFlight = priv->snd_nxt - priv->snd_una; nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0; snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf); if (snd_buffered < nInFlight) /* iff a FIN has been sent */ nAvailable = 0; else nAvailable = min(snd_buffered - nInFlight, priv->mss); if (nAvailable > nUseable) { if (nUseable * 4 < nWindow) { // RFC 813 - avoid SWS nAvailable = 0; } else { nAvailable = nUseable; } } if (bFirst) { gsize available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf); bFirst = FALSE; DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "[cwnd: %u nWindow: %u nInFlight: %u " "nAvailable: %u nQueued: %" G_GSIZE_FORMAT " nEmpty: %" G_GSIZE_FORMAT " nWaiting: %zu ssthresh: %u]", priv->cwnd, nWindow, nInFlight, nAvailable, snd_buffered, available_space, snd_buffered - nInFlight, priv->ssthresh); } if (sflags == sfDuplicateAck) { packet(self, priv->snd_nxt, 0, 0, 0, now); sflags = sfNone; continue; } if (nAvailable == 0 && sflags != sfFin && sflags != sfRst) { if (sflags == sfNone) return; // If this is an immediate ack, or the second delayed ack if ((sflags == sfImmediateAck || sflags == sfDuplicateAck) || priv->t_ack) { packet(self, priv->snd_nxt, 0, 0, 0, now); } else { priv->t_ack = now; } return; } // Nagle algorithm // If there is data already in-flight, and we haven't a full segment of // data ready to send then hold off until we get more to send, or the // in-flight data is acknowledged. if (priv->use_nagling && sflags != sfFin && sflags != sfRst && (priv->snd_nxt > priv->snd_una) && (nAvailable < priv->mss)) { return; } // Find the next segment to transmit iter = g_queue_peek_head_link (&priv->unsent_slist); if (iter == NULL) return; sseg = iter->data; // If the segment is too large, break it into two if (sseg->len > nAvailable && sflags != sfFin && sflags != sfRst) { SSegment *subseg = g_slice_new0 (SSegment); subseg->seq = sseg->seq + nAvailable; subseg->len = sseg->len - nAvailable; subseg->flags = sseg->flags; sseg->len = nAvailable; g_queue_insert_after (&priv->unsent_slist, iter, subseg); g_queue_insert_after (&priv->slist, g_queue_find (&priv->slist, sseg), subseg); } transmit_status = transmit(self, sseg, now); if (transmit_status != 0) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "transmit failed"); // TODO: Is this the right thing ? closedown (self, transmit_status, CLOSEDOWN_REMOTE); return; } if (sflags == sfImmediateAck || sflags == sfDelayedAck) sflags = sfNone; } } /* If @source is %CLOSEDOWN_REMOTE, don’t send an RST packet, since closedown() * has been called as a result of an RST segment being received. * See: RFC 1122, §4.2.2.13. */ static void closedown (PseudoTcpSocket *self, guint32 err, ClosedownSource source) { PseudoTcpSocketPrivate *priv = self->priv; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Closing down socket %p with %s error %u.", self, (source == CLOSEDOWN_LOCAL) ? "local" : "remote", err); if (source == CLOSEDOWN_LOCAL && priv->support_fin_ack) { queue_rst_message (self); attempt_send (self, sfRst); } else if (source == CLOSEDOWN_LOCAL) { priv->shutdown = SD_FORCEFUL; } /* ‘Cute’ little navigation through the state machine to avoid breaking the * invariant that CLOSED can only be reached from TIME-WAIT or LAST-ACK. */ switch (priv->state) { case PSEUDO_TCP_LISTEN: case PSEUDO_TCP_SYN_SENT: break; case PSEUDO_TCP_SYN_RECEIVED: case PSEUDO_TCP_ESTABLISHED: set_state (self, PSEUDO_TCP_FIN_WAIT_1); /* Fall through. */ case PSEUDO_TCP_FIN_WAIT_1: set_state (self, PSEUDO_TCP_FIN_WAIT_2); /* Fall through. */ case PSEUDO_TCP_FIN_WAIT_2: case PSEUDO_TCP_CLOSING: set_state (self, PSEUDO_TCP_TIME_WAIT); break; case PSEUDO_TCP_CLOSE_WAIT: set_state (self, PSEUDO_TCP_LAST_ACK); break; case PSEUDO_TCP_LAST_ACK: case PSEUDO_TCP_TIME_WAIT: case PSEUDO_TCP_CLOSED: default: break; } set_state_closed (self, err); } static void adjustMTU(PseudoTcpSocket *self) { PseudoTcpSocketPrivate *priv = self->priv; // Determine our current mss level, so that we can adjust appropriately later for (priv->msslevel = 0; PACKET_MAXIMUMS[priv->msslevel + 1] > 0; ++priv->msslevel) { if (((guint16)PACKET_MAXIMUMS[priv->msslevel]) <= priv->mtu_advise) { break; } } priv->mss = priv->mtu_advise - PACKET_OVERHEAD; // !?! Should we reset priv->largest here? DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Adjusting mss to %u bytes", priv->mss); // Enforce minimums on ssthresh and cwnd priv->ssthresh = max(priv->ssthresh, 2 * priv->mss); priv->cwnd = max(priv->cwnd, priv->mss); } static void apply_window_scale_option (PseudoTcpSocket *self, guint8 scale_factor) { PseudoTcpSocketPrivate *priv = self->priv; priv->swnd_scale = scale_factor; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Setting scale factor to %u", scale_factor); } static void apply_fin_ack_option (PseudoTcpSocket *self) { PseudoTcpSocketPrivate *priv = self->priv; priv->support_fin_ack = TRUE; } static void apply_option (PseudoTcpSocket *self, guint8 kind, const guint8 *data, guint32 len) { switch (kind) { case TCP_OPT_MSS: DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Peer specified MSS option which is not supported."); // TODO: Implement. break; case TCP_OPT_WND_SCALE: // Window scale factor. // http://www.ietf.org/rfc/rfc1323.txt if (len != 1) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid window scale option received."); return; } apply_window_scale_option(self, data[0]); break; case TCP_OPT_FIN_ACK: // FIN-ACK support. DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "FIN-ACK support enabled."); apply_fin_ack_option (self); break; case TCP_OPT_EOL: case TCP_OPT_NOOP: /* Nothing to do. */ break; default: DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid TCP option %u", kind); break; } } static void parse_options (PseudoTcpSocket *self, const guint8 *data, guint32 len) { PseudoTcpSocketPrivate *priv = self->priv; gboolean has_window_scaling_option = FALSE; gboolean has_fin_ack_option = FALSE; guint32 pos = 0; // See http://www.freesoft.org/CIE/Course/Section4/8.htm for // parsing the options list. while (pos < len) { guint8 kind = TCP_OPT_EOL; guint8 opt_len; if (len < pos + 1) return; kind = data[pos]; pos++; if (kind == TCP_OPT_EOL) { // End of option list. break; } else if (kind == TCP_OPT_NOOP) { // No op. continue; } if (len < pos + 1) return; // Length of this option. opt_len = data[pos]; pos++; if (len < pos + opt_len) return; // Content of this option. if (opt_len <= len - pos) { apply_option (self, kind, data + pos, opt_len); pos += opt_len; } else { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid option length received."); return; } if (kind == TCP_OPT_WND_SCALE) has_window_scaling_option = TRUE; else if (kind == TCP_OPT_FIN_ACK) has_fin_ack_option = TRUE; } if (!has_window_scaling_option) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Peer doesn't support window scaling"); if (priv->rwnd_scale > 0) { // Peer doesn't support TCP options and window scaling. // Revert receive buffer size to default value. resize_receive_buffer (self, DEFAULT_RCV_BUF_SIZE); priv->swnd_scale = 0; } } if (!has_fin_ack_option) { DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Peer doesn't support FIN-ACK"); priv->support_fin_ack = FALSE; } } static void resize_send_buffer (PseudoTcpSocket *self, guint32 new_size) { PseudoTcpSocketPrivate *priv = self->priv; priv->sbuf_len = new_size; pseudo_tcp_fifo_set_capacity (&priv->sbuf, new_size); } static void resize_receive_buffer (PseudoTcpSocket *self, guint32 new_size) { PseudoTcpSocketPrivate *priv = self->priv; guint8 scale_factor = 0; gboolean result; gsize available_space; if (priv->rbuf_len == new_size) return; // Determine the scale factor such that the scaled window size can fit // in a 16-bit unsigned integer. while (new_size > 0xFFFF) { ++scale_factor; new_size >>= 1; } // Determine the proper size of the buffer. new_size <<= scale_factor; result = pseudo_tcp_fifo_set_capacity (&priv->rbuf, new_size); // Make sure the new buffer is large enough to contain data in the old // buffer. This should always be true because this method is called either // before connection is established or when peers are exchanging connect // messages. g_assert (result); priv->rbuf_len = new_size; priv->rwnd_scale = scale_factor; priv->ssthresh = new_size; available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf); priv->rcv_wnd = available_space; } gint pseudo_tcp_socket_get_available_bytes (PseudoTcpSocket *self) { PseudoTcpSocketPrivate *priv = self->priv; return pseudo_tcp_fifo_get_buffered (&priv->rbuf); } gboolean pseudo_tcp_socket_can_send (PseudoTcpSocket *self) { return (pseudo_tcp_socket_get_available_send_space (self) > 0); } gsize pseudo_tcp_socket_get_available_send_space (PseudoTcpSocket *self) { PseudoTcpSocketPrivate *priv = self->priv; gsize ret; if (!pseudo_tcp_state_has_sent_fin (priv->state)) { ret = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf); } else { ret = 0; } if (ret == 0) priv->bWriteEnable = TRUE; return ret; } /* State names are capitalised and formatted as in RFC 793. */ static const gchar * pseudo_tcp_state_get_name (PseudoTcpState state) { switch (state) { case PSEUDO_TCP_LISTEN: return "LISTEN"; case PSEUDO_TCP_SYN_SENT: return "SYN-SENT"; case PSEUDO_TCP_SYN_RECEIVED: return "SYN-RECEIVED"; case PSEUDO_TCP_ESTABLISHED: return "ESTABLISHED"; case PSEUDO_TCP_CLOSED: return "CLOSED"; case PSEUDO_TCP_FIN_WAIT_1: return "FIN-WAIT-1"; case PSEUDO_TCP_FIN_WAIT_2: return "FIN-WAIT-2"; case PSEUDO_TCP_CLOSING: return "CLOSING"; case PSEUDO_TCP_TIME_WAIT: return "TIME-WAIT"; case PSEUDO_TCP_CLOSE_WAIT: return "CLOSE-WAIT"; case PSEUDO_TCP_LAST_ACK: return "LAST-ACK"; default: return "UNKNOWN"; } } static void set_state (PseudoTcpSocket *self, PseudoTcpState new_state) { PseudoTcpSocketPrivate *priv = self->priv; PseudoTcpState old_state = priv->state; if (new_state == old_state) return; DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State %s → %s.", pseudo_tcp_state_get_name (old_state), pseudo_tcp_state_get_name (new_state)); /* Check whether it’s a valid state transition. */ #define TRANSITION(OLD, NEW) \ (old_state == PSEUDO_TCP_##OLD && \ new_state == PSEUDO_TCP_##NEW) /* Valid transitions. See: RFC 793, p23; RFC 1122, §4.2.2.8. */ g_assert (/* RFC 793, p23. */ TRANSITION (CLOSED, SYN_SENT) || TRANSITION (SYN_SENT, CLOSED) || TRANSITION (CLOSED, LISTEN) || TRANSITION (LISTEN, CLOSED) || TRANSITION (LISTEN, SYN_SENT) || TRANSITION (LISTEN, SYN_RECEIVED) || TRANSITION (SYN_SENT, SYN_RECEIVED) || TRANSITION (SYN_RECEIVED, ESTABLISHED) || TRANSITION (SYN_SENT, ESTABLISHED) || TRANSITION (SYN_RECEIVED, FIN_WAIT_1) || TRANSITION (ESTABLISHED, FIN_WAIT_1) || TRANSITION (ESTABLISHED, CLOSE_WAIT) || TRANSITION (FIN_WAIT_1, FIN_WAIT_2) || TRANSITION (FIN_WAIT_1, CLOSING) || TRANSITION (CLOSE_WAIT, LAST_ACK) || TRANSITION (FIN_WAIT_2, TIME_WAIT) || TRANSITION (CLOSING, TIME_WAIT) || TRANSITION (LAST_ACK, CLOSED) || TRANSITION (TIME_WAIT, CLOSED) || /* RFC 1122, §4.2.2.8. */ TRANSITION (SYN_RECEIVED, LISTEN) || TRANSITION (FIN_WAIT_1, TIME_WAIT)); #undef TRANSITION priv->state = new_state; } static void set_state_established (PseudoTcpSocket *self) { PseudoTcpSocketPrivate *priv = self->priv; set_state (self, PSEUDO_TCP_ESTABLISHED); adjustMTU (self); if (priv->callbacks.PseudoTcpOpened) priv->callbacks.PseudoTcpOpened (self, priv->callbacks.user_data); } /* (err == 0) means no error. */ static void set_state_closed (PseudoTcpSocket *self, guint32 err) { PseudoTcpSocketPrivate *priv = self->priv; set_state (self, PSEUDO_TCP_CLOSED); /* Only call the callback if there was an error. */ if (priv->callbacks.PseudoTcpClosed && err != 0) priv->callbacks.PseudoTcpClosed (self, err, priv->callbacks.user_data); } gboolean pseudo_tcp_socket_is_closed (PseudoTcpSocket *self) { PseudoTcpSocketPrivate *priv = self->priv; return (priv->state == PSEUDO_TCP_CLOSED); } gboolean pseudo_tcp_socket_is_closed_remotely (PseudoTcpSocket *self) { PseudoTcpSocketPrivate *priv = self->priv; return pseudo_tcp_state_has_received_fin (priv->state); }