1package DJabberd::Connection::ClusterIn; 2use strict; 3use base 'DJabberd::Connection'; 4use DJabberd::ClusterMessage; 5use DJabberd::ClusterMessage::DeliverStanza; 6use fields ( 7 'buf', 8 ); 9use DJabberd::Log; 10 11sub new { 12 my ($class, $sock, $server) = @_; 13 my $self = Danga::Socket::new($class, $sock); 14 15 $self->{id} = fileno($sock); 16 $self->{vhost} = undef; # set once we get a stream start header from them 17 $self->{server} = $server; 18 $self->{buf} = ''; 19 $self->{log} = DJabberd::Log->get_logger($class); 20 $self->log->debug("New clusterid connection '$self->{id}' from " . ($self->peer_ip_string || "<undef>")); 21 return $self; 22} 23 24sub set_vhost { 25 my ($self, $vhost) = @_; 26 return 0 unless $vhost->{s2s}; 27 return $self->SUPER::set_vhost($vhost); 28} 29 30sub event_read { 31 my $self = shift; 32 my $bref = $self->read(20_000) 33 or return $self->close; 34 35 $self->{buf} .= $$bref; 36 while ($self->{buf} =~ /^DJAB(....)/s) { 37 my $len = unpack("N", $1); 38 die "packet too big" if $len > 5 * 1024 * 1024; # arbitrary 39 return unless length($self->{buf}) >= $len + 8; 40 41 $self->{buf} =~ s/^DJAB....//s; 42 my $payload = substr($self->{buf}, 0, $len, ''); 43 my $cmsg = eval { DJabberd::ClusterMessage->thaw(\$payload) }; 44 45 if (! $cmsg && $payload =~ /^vhost=(.+)/) { 46 my $hostname = $1; 47 my $vhost = $self->server->lookup_vhost($hostname) 48 or $self->close; 49 $self->{vhost} = $vhost; 50 next; 51 } 52 53 # need a vhost past this point 54 return $self->close unless $self->{vhost}; 55 56 if ($cmsg) { 57 $cmsg->process($self->{vhost}); 58 } else { 59 print "Got payload text: [$payload]\n"; 60 } 61 } 62} 63 64 651; 66