1package DJabberd::Connection::ClusterIn;
2use strict;
3use base 'DJabberd::Connection';
4use DJabberd::ClusterMessage;
5use DJabberd::ClusterMessage::DeliverStanza;
6use fields (
7            'buf',
8            );
9use DJabberd::Log;
10
11sub new {
12    my ($class, $sock, $server) = @_;
13    my $self = Danga::Socket::new($class, $sock);
14
15    $self->{id}      = fileno($sock);
16    $self->{vhost}   = undef;  # set once we get a stream start header from them
17    $self->{server}  = $server;
18    $self->{buf}     = '';
19    $self->{log}     = DJabberd::Log->get_logger($class);
20    $self->log->debug("New clusterid connection '$self->{id}' from " . ($self->peer_ip_string || "<undef>"));
21    return $self;
22}
23
24sub set_vhost {
25    my ($self, $vhost) = @_;
26    return 0 unless $vhost->{s2s};
27    return $self->SUPER::set_vhost($vhost);
28}
29
30sub event_read {
31    my $self = shift;
32    my $bref = $self->read(20_000)
33        or return $self->close;
34
35    $self->{buf} .= $$bref;
36    while ($self->{buf} =~ /^DJAB(....)/s) {
37        my $len = unpack("N", $1);
38        die "packet too big" if $len > 5 * 1024 * 1024; # arbitrary
39        return unless length($self->{buf}) >= $len + 8;
40
41        $self->{buf} =~ s/^DJAB....//s;
42        my $payload = substr($self->{buf}, 0, $len, '');
43        my $cmsg = eval { DJabberd::ClusterMessage->thaw(\$payload) };
44
45        if (! $cmsg && $payload =~ /^vhost=(.+)/) {
46            my $hostname = $1;
47            my $vhost = $self->server->lookup_vhost($hostname)
48                or $self->close;
49            $self->{vhost} = $vhost;
50            next;
51        }
52
53        # need a vhost past this point
54        return $self->close unless $self->{vhost};
55
56        if ($cmsg) {
57            $cmsg->process($self->{vhost});
58        } else {
59            print "Got payload text: [$payload]\n";
60        }
61    }
62}
63
64
651;
66