1package DJabberd::Queue; 2 3use strict; 4use warnings; 5 6use base 'Exporter'; 7 8our @EXPORT_OK = qw(NO_CONN RESOLVING CONNECTING CONNECTED); 9 10use DJabberd::Log; 11 12our $logger = DJabberd::Log->get_logger; 13 14use fields ( 15 'vhost', 16 'endpoints', 17 'to_deliver', 18 'last_connect_fail', 19 'state', 20 'connection', 21 ); 22 23use constant NO_CONN => \ "no connection"; 24use constant RESOLVING => \ "resolving"; 25use constant CONNECTING => \ "connecting"; 26use constant CONNECTED => \ "connected"; 27 28sub new { 29 my $self = shift; 30 my %opts = @_; 31 32 $self = fields::new($self) unless ref $self; 33 34 $self->{vhost} = delete $opts{vhost} or die "vhost required"; 35 Carp::croak("Not a vhost: $self->{vhost}") unless $self->vhost->isa("DJabberd::VHost"); 36 37 if (my $endpoints = delete $opts{endpoints}) { 38 Carp::croak("endpoints must be an arrayref") unless (ref $endpoints eq 'ARRAY'); 39 $self->{endpoints} = $endpoints; 40 } else { 41 $self->{endpoints} = []; 42 } 43 44 die "too many opts" if %opts; 45 46 $self->{to_deliver} = []; # DJabberd::QueueItem? 47 $self->{last_connect_fail} = 0; # unixtime of last connection failure 48 49 $self->{state} = NO_CONN; # see states above 50 51 return $self; 52} 53 54sub endpoints { 55 my $self = shift; 56 my $endpoints = $self->{endpoints}; 57 58 if (@_) { 59 @$endpoints = @_; 60 } 61 return @$endpoints; 62} 63 64# called by Connection::ServerOut constructor 65sub set_connection { 66 my ($self, $conn) = @_; 67 $logger->debug("Set connection for queue to '$self->{domain}' to connection '$conn->{id}'"); 68 $self->{connection} = $conn; 69} 70 71sub vhost { 72 my $self = shift; 73 return $self->{vhost}; 74} 75 76sub enqueue { 77 my ($self, $stanza, $cb) = @_; 78 79 $logger->debug("Queuing stanza (" . $stanza . ") for"); 80 81 if ($self->{state} == NO_CONN) { 82 $logger->debug(" .. starting to connect to"); 83 $self->start_connecting; 84 } 85 86 if ($self->{state} == CONNECTED) { 87 $logger->debug(" .. already connected, writing stanza."); 88 $self->{connection}->send_stanza($stanza); 89 $cb->delivered; 90 } else { 91 $logger->debug(" .. pushing queue item."); 92 push @{ $self->{to_deliver} }, DJabberd::QueueItem->new($stanza, $cb); 93 } 94} 95 96sub failed_to_connect { 97 my $self = shift; 98 $self->{state} = NO_CONN; 99 $self->{last_connect_fail} = time(); 100 101 $logger->debug("Failed to connect queue"); 102 while (my $qi = shift @{ $self->{to_deliver} }) { 103 $qi->callback->error; 104 } 105} 106 107# called by our connection when it's connected 108sub on_connection_connected { 109 my ($self, $conn) = @_; 110 $logger->debug("connection $conn->{id} connected! conn=$conn->{id}, selfcon=$self->{connection}->{id}"); 111 112 # TODO why are we this checking here? 113 return unless $conn == $self->{connection}; 114 115 $logger->debug(" ... unloading queue items"); 116 $self->{state} = CONNECTED; 117 while (my $qi = shift @{ $self->{to_deliver} }) { 118 $conn->send_stanza($qi->stanza); 119 $qi->callback->delivered; 120 # TODO: the connection might need to handle marking things as delivered 121 # otherwise we could run into a problem if the connection dies mid-stanza. 122 } 123} 124 125sub on_connection_failed { 126 my ($self, $conn) = @_; 127 $logger->debug("connection failed for queue"); 128 return unless $conn == $self->{connection}; 129 $logger->debug(" .. match"); 130 return $self->failed_to_connect; 131} 132 133sub on_connection_error { 134 my ($self, $conn) = @_; 135 $logger->debug("connection error for queue"); 136 return unless $conn == $self->{connection}; 137 $logger->debug(" .. match"); 138 my $pre_state = $self->{state}; 139 140 $self->{state} = NO_CONN; 141 $self->{connection} = undef; 142 143 if ($pre_state == CONNECTING) { 144 # died while connecting: no more luck 145 $self->give_up_connecting; 146 $self->on_final_error; 147 } else { 148 # died during an active connection, let's try again 149 if (@{ $self->{to_deliver} }) { 150 $logger->warn("Reconnecting to '$self->{domain}'"); 151 $self->start_connecting; 152 } 153 } 154} 155 156sub on_final_error { 157 my $self = shift; 158 while (my $qi = shift @{ $self->{to_deliver} }) { 159 $qi->callback->error("connection failure"); 160 } 161} 162 163sub start_connecting { 164 my $self = shift; 165 $logger->debug("Starting connection"); 166 die unless $self->{state} == NO_CONN; 167 168 my $endpoints = $self->{endpoints}; 169 170 unless (@$endpoints) { 171 $self->failed_to_connect; 172 return; 173 } 174 175 $self->{state} = CONNECTING; 176 177 my $endpt = $endpoints->[0]; 178 my $conn = $self->new_connection(endpoint => $endpt, queue => $self); 179 $self->set_connection($conn); 180 $conn->start_connecting; 181 182} 183 184sub new_connection { 185 die "Sorry, this method needs to be overridden in your subclass of " . ref( $_[0] ) . "."; 186} 187 188package DJabberd::QueueItem; 189 190sub new { 191 my ($class, $stanza, $cb) = @_; 192 return bless [ $stanza, $cb ], $class; 193} 194 195sub stanza { return $_[0][0] } 196sub callback { return $_[0][1] } 197 1981; 199