1package DJabberd::Queue;
2
3use strict;
4use warnings;
5
6use base 'Exporter';
7
8our @EXPORT_OK = qw(NO_CONN RESOLVING CONNECTING CONNECTED);
9
10use DJabberd::Log;
11
12our $logger = DJabberd::Log->get_logger;
13
14use fields (
15            'vhost',
16            'endpoints',
17            'to_deliver',
18            'last_connect_fail',
19            'state',
20            'connection',
21            );
22
23use constant NO_CONN    => \ "no connection";
24use constant RESOLVING  => \ "resolving";
25use constant CONNECTING => \ "connecting";
26use constant CONNECTED  => \ "connected";
27
28sub new {
29    my $self = shift;
30    my %opts = @_;
31
32    $self = fields::new($self) unless ref $self;
33
34    $self->{vhost}      = delete $opts{vhost}  or die "vhost required";
35    Carp::croak("Not a vhost: $self->{vhost}") unless $self->vhost->isa("DJabberd::VHost");
36
37    if (my $endpoints = delete $opts{endpoints}) {
38        Carp::croak("endpoints must be an arrayref") unless (ref $endpoints eq 'ARRAY');
39          $self->{endpoints} = $endpoints;
40      } else {
41          $self->{endpoints} = [];
42      }
43
44    die "too many opts" if %opts;
45
46    $self->{to_deliver} = [];  # DJabberd::QueueItem?
47    $self->{last_connect_fail} = 0;  # unixtime of last connection failure
48
49    $self->{state} = NO_CONN;   # see states above
50
51    return $self;
52}
53
54sub endpoints {
55    my $self = shift;
56    my $endpoints = $self->{endpoints};
57
58    if (@_) {
59        @$endpoints = @_;
60    }
61    return @$endpoints;
62}
63
64# called by Connection::ServerOut constructor
65sub set_connection {
66    my ($self, $conn) = @_;
67    $logger->debug("Set connection for queue to '$self->{domain}' to connection '$conn->{id}'");
68    $self->{connection} = $conn;
69}
70
71sub vhost {
72    my $self = shift;
73    return $self->{vhost};
74}
75
76sub enqueue {
77    my ($self, $stanza, $cb) = @_;
78
79    $logger->debug("Queuing stanza (" . $stanza . ") for");
80
81    if ($self->{state} == NO_CONN) {
82        $logger->debug("  .. starting to connect to");
83        $self->start_connecting;
84    }
85
86    if ($self->{state} == CONNECTED) {
87        $logger->debug("  .. already connected, writing stanza.");
88        $self->{connection}->send_stanza($stanza);
89        $cb->delivered;
90    } else {
91        $logger->debug("  .. pushing queue item.");
92        push @{ $self->{to_deliver} }, DJabberd::QueueItem->new($stanza, $cb);
93    }
94}
95
96sub failed_to_connect {
97    my $self = shift;
98    $self->{state}             = NO_CONN;
99    $self->{last_connect_fail} = time();
100
101    $logger->debug("Failed to connect queue");
102    while (my $qi = shift @{ $self->{to_deliver} }) {
103        $qi->callback->error;
104    }
105}
106
107# called by our connection when it's connected
108sub on_connection_connected {
109    my ($self, $conn) = @_;
110    $logger->debug("connection $conn->{id} connected!  conn=$conn->{id}, selfcon=$self->{connection}->{id}");
111
112    # TODO why are we this checking here?
113    return unless $conn == $self->{connection};
114
115    $logger->debug(" ... unloading queue items");
116    $self->{state} = CONNECTED;
117    while (my $qi = shift @{ $self->{to_deliver} }) {
118        $conn->send_stanza($qi->stanza);
119        $qi->callback->delivered;
120        # TODO: the connection might need to handle marking things as delivered
121        # otherwise we could run into a problem if the connection dies mid-stanza.
122    }
123}
124
125sub on_connection_failed {
126   my ($self, $conn) = @_;
127   $logger->debug("connection failed for queue");
128   return unless $conn == $self->{connection};
129   $logger->debug("  .. match");
130   return $self->failed_to_connect;
131}
132
133sub on_connection_error {
134   my ($self, $conn) = @_;
135   $logger->debug("connection error for queue");
136   return unless $conn == $self->{connection};
137   $logger->debug("  .. match");
138   my $pre_state = $self->{state};
139
140   $self->{state}      = NO_CONN;
141   $self->{connection} = undef;
142
143   if ($pre_state == CONNECTING) {
144       # died while connecting:  no more luck
145       $self->give_up_connecting;
146       $self->on_final_error;
147   } else {
148       # died during an active connection, let's try again
149       if (@{ $self->{to_deliver} }) {
150           $logger->warn("Reconnecting to '$self->{domain}'");
151           $self->start_connecting;
152       }
153   }
154}
155
156sub on_final_error {
157    my $self = shift;
158    while (my $qi = shift @{ $self->{to_deliver} }) {
159        $qi->callback->error("connection failure");
160    }
161}
162
163sub start_connecting {
164    my $self = shift;
165    $logger->debug("Starting connection");
166    die unless $self->{state} == NO_CONN;
167
168    my $endpoints = $self->{endpoints};
169
170    unless (@$endpoints) {
171        $self->failed_to_connect;
172        return;
173    }
174
175    $self->{state} = CONNECTING;
176
177    my $endpt = $endpoints->[0];
178    my $conn = $self->new_connection(endpoint => $endpt, queue => $self);
179    $self->set_connection($conn);
180    $conn->start_connecting;
181
182}
183
184sub new_connection {
185    die "Sorry, this method needs to be overridden in your subclass of " . ref( $_[0] ) . ".";
186}
187
188package DJabberd::QueueItem;
189
190sub new {
191    my ($class, $stanza, $cb) = @_;
192    return bless [ $stanza, $cb ], $class;
193}
194
195sub stanza   { return $_[0][0] }
196sub callback { return $_[0][1] }
197
1981;
199