1package PocketIO::Pool;
2
3use strict;
4use warnings;
5
6use Scalar::Util qw(blessed);
7
8use PocketIO::Connection;
9
10use constant DEBUG => $ENV{POCKETIO_POOL_DEBUG};
11
12sub new {
13    my $class = shift;
14
15    my $self = {@_};
16    bless $self, $class;
17
18    $self->{connections} = {};
19    $self->{rooms}       = {};
20    $self->{revrooms}    = {};
21
22    return $self;
23}
24
25sub find_local_connection {
26    my $self = shift;
27    my ($conn) = @_;
28
29    my $id = blessed $conn ? $conn->id : $conn;
30
31    return $self->{connections}->{$id};
32}
33
34sub find_connection {
35    my $self = shift;
36
37    return $self->find_local_connection(@_);
38}
39
40sub add_connection {
41    my $self = shift;
42    my $cb   = pop @_;
43
44    my $conn = $self->_build_connection(@_);
45
46    $self->{connections}->{$conn->id} = $conn;
47
48    DEBUG && warn "Added connection '" . $conn->id . "'\n";
49
50    return $cb->($conn);
51}
52
53sub remove_connection {
54    my $self = shift;
55    my ($conn, $cb) = @_;
56
57    my $id = blessed $conn ? $conn->id : $conn;
58
59    delete $self->{connections}->{$id};
60    foreach my $room (keys %{$self->{revrooms}{$id}}) {
61        delete $self->{rooms}{$room}{$id};
62    }
63    delete $self->{revrooms}{$id};
64
65    DEBUG && warn "Removed connection '" . $id . "'\n";
66
67    return $cb->() if $cb;
68}
69
70sub room_join {
71    my $self = shift;
72    my $room = shift;
73    my $conn = shift;
74
75    my $id = blessed $conn ? $conn->id : $conn;
76    $conn = $self->{connections}->{$id};
77
78    $self->{rooms}{$room}{$id}    = $conn;
79    $self->{revrooms}{$id}{$room} = $conn;
80    return $conn;
81}
82
83sub room_leave {
84    my $self       = shift;
85    my $room       = shift;
86    my $conn       = shift;
87    my ($subrooms) = @_;
88
89    my $id = blessed $conn ? $conn->id : $conn;
90
91    if ($subrooms) {
92        DEBUG && warn "Deleting '$id' subrooms of '$room'\n";
93        foreach my $subroom (keys %{$self->{revrooms}{$id}}) {
94            if ($subroom =~ /^\Q$room\E/) {
95                delete $self->{rooms}{$subroom}{$id};
96                delete $self->{revrooms}{$id}{$subroom};
97            }
98        }
99    }
100    else {
101        DEBUG && warn "Deleting just '$id' room '$room'\n";
102        delete $self->{rooms}{$room}{$id};
103        delete $self->{revrooms}{$id}{$room};
104    }
105    return $conn;
106}
107
108sub send_raw {
109    my $self = shift;
110    my ($msg) = {@_};
111
112    if (defined $msg->{id}) {
113
114        # Message directly to a connection.
115        my $conn = $self->find_local_connection($msg->{id});
116        if (defined $conn) {
117
118            # Send the message here and now.
119            DEBUG && warn "Sending message to $msg->{id}\n";
120            if (defined $msg->{bytes}) {
121                $conn->write($msg->{bytes});
122            }
123            else {
124                $conn->send($msg->{message});
125            }
126        }
127        return $conn;
128    }
129
130    my @members =
131      defined $msg->{room}
132      ? values %{$self->{rooms}{$msg->{room}}}
133      : $self->_connections;
134
135    foreach my $conn (@members) {
136        next unless blessed $conn && $conn->is_connected;
137        next if defined $msg->{invoker} && $conn->id eq $msg->{invoker}->id;
138
139        DEBUG && warn "Sending message to " . $conn->id . "\n";
140        $conn->socket->send($msg->{message});
141    }
142
143    return $self;
144}
145
146sub send {
147    my $self = shift;
148
149    return $self->send_raw(message => $_[0]);
150}
151
152sub broadcast {
153    my $self    = shift;
154    my $invoker = shift;
155
156    return $self->send_raw(message => $_[0], invoker => $invoker);
157}
158
159sub _connections {
160    my $self = shift;
161
162    return values %{$self->{connections}};
163}
164
165sub _build_connection {
166    my $self = shift;
167
168    return PocketIO::Connection->new(
169        @_,
170        pool                => $self,
171        on_connect_failed   => sub { $self->remove_connection(@_) },
172        on_reconnect_failed => sub {
173            my $conn = shift;
174
175            $conn->disconnected;
176
177            $self->remove_connection($conn);
178        }
179    );
180}
181
1821;
183__END__
184
185=head1 NAME
186
187PocketIO::Pool - Connection pool
188
189=head1 DESCRIPTION
190
191L<PocketIO::Pool> is a connection pool.
192
193=head1 METHODS
194
195=head2 C<new>
196
197=head2 C<find_connection>
198
199=head2 C<add_connection>
200
201=head2 C<remove_connection>
202
203=head2 C<connections>
204
205=head2 C<send>
206
207=head2 C<broadcast>
208
209=cut
210