1<?php
2/*
3 * vim:set softtabstop=4 shiftwidth=4 expandtab:
4 *
5 * LICENSE: GNU Affero General Public License, version 3 (AGPL-3.0-or-later)
6 * Copyright 2001 - 2020 Ampache.org
7 *
8 * This program is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU Affero General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 * GNU Affero General Public License for more details.
17 *
18 * You should have received a copy of the GNU Affero General Public License
19 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
20 *
21 */
22
23declare(strict_types=0);
24
25namespace Ampache\Module\Broadcast;
26
27use Ampache\Config\AmpConfig;
28use Ampache\Repository\Model\Broadcast;
29use Ampache\Module\System\Core;
30use Exception;
31use Ratchet\MessageComponentInterface;
32use Ratchet\ConnectionInterface;
33use Ampache\Module\System\Session;
34use Ampache\Module\Playback\Stream;
35use Ampache\Module\Playback\Stream_Playlist;
36use Ampache\Module\Playback\WebPlayer;
37
38class Broadcast_Server implements MessageComponentInterface
39{
40    const BROADCAST_SONG               = "SONG";
41    const BROADCAST_SONG_POSITION      = "SONG_POSITION";
42    const BROADCAST_PLAYER_PLAY        = "PLAYER_PLAY";
43    const BROADCAST_REGISTER_BROADCAST = "REGISTER_BROADCAST";
44    const BROADCAST_REGISTER_LISTENER  = "REGISTER_LISTENER";
45    const BROADCAST_ENDED              = "ENDED";
46    const BROADCAST_INFO               = "INFO";
47    const BROADCAST_NB_LISTENERS       = "NB_LISTENERS";
48    const BROADCAST_AUTH_SID           = "AUTH_SID";
49
50    public $verbose;
51    /**
52     * @var ConnectionInterface[] $clients
53     */
54    protected $clients;
55    /**
56     * @var string[] $sids
57     */
58    protected $sids;
59    /**
60     * @var ConnectionInterface[] $listeners
61     */
62    protected $listeners;
63    /**
64     * @var Broadcast[] $broadcasters
65     */
66    protected $broadcasters;
67
68    public function __construct()
69    {
70        $this->verbose      = false;
71        $this->clients      = array();
72        $this->sids         = array();
73        $this->listeners    = array();
74        $this->broadcasters = array();
75    }
76
77    /**
78     *
79     * @param ConnectionInterface $conn
80     */
81    public function onOpen(ConnectionInterface $conn)
82    {
83        $this->clients[$conn->resourceId] = $conn;
84    }
85
86    /**
87     *
88     * @param ConnectionInterface $from
89     * @param string $msg
90     */
91    public function onMessage(ConnectionInterface $from, $msg)
92    {
93        $commands = explode(';', (string)$msg);
94        foreach ($commands as $command) {
95            $command = trim((string)$command);
96            if (!empty($command)) {
97                $cmdinfo = explode(':', $command, 2);
98
99                if (count($cmdinfo) == 2) {
100                    switch ($cmdinfo[0]) {
101                        case self::BROADCAST_SONG:
102                            $this->notifySong($from, (int)$cmdinfo[1]);
103                            break;
104                        case self::BROADCAST_SONG_POSITION:
105                            $this->notifySongPosition($from, (int)$cmdinfo[1]);
106                            break;
107                        case self::BROADCAST_PLAYER_PLAY:
108                            $this->notifyPlayerPlay($from, make_bool($cmdinfo[1]));
109                            break;
110                        case self::BROADCAST_ENDED:
111                            $this->notifyEnded($from);
112                            break;
113                        case self::BROADCAST_REGISTER_BROADCAST:
114                            $this->registerBroadcast($from, (int)$cmdinfo[1]);
115                            break;
116                        case self::BROADCAST_REGISTER_LISTENER:
117                            $this->registerListener($from, (int)$cmdinfo[1]);
118                            break;
119                        case self::BROADCAST_AUTH_SID:
120                            $this->authSid($from, $cmdinfo[1]);
121                            break;
122                        default:
123                            self::echo_message($this->verbose,
124                                "[" . time() . "][warning]Unknown message code." . "\r\n");
125                            break;
126                    }
127                } else {
128                    self::echo_message($this->verbose,
129                        "[" . time() . "][error]Wrong message format (" . $command . ")." . "\r\n");
130                }
131            }
132        }
133    }
134
135    /**
136     *
137     * @param integer $song_id
138     * @return string
139     */
140    protected function getSongJS($song_id)
141    {
142        $media   = array();
143        $media[] = array(
144            'object_type' => 'song',
145            'object_id' => $song_id
146        );
147        $item          = Stream_Playlist::media_to_urlarray($media);
148        $transcode_cfg = AmpConfig::get('transcode');
149
150        return WebPlayer::get_media_js_param($item[0], $transcode_cfg);
151    }
152
153    /**
154     *
155     * @param ConnectionInterface $from
156     * @param integer $song_id
157     */
158    protected function notifySong(ConnectionInterface $from, $song_id)
159    {
160        if ($this->isBroadcaster($from)) {
161            $broadcast = $this->broadcasters[$from->resourceId];
162            $clients   = $this->getListeners($broadcast);
163
164            Session::extend(Stream::get_session(), 'stream');
165
166            $broadcast->update_song($song_id);
167            $this->broadcastMessage($clients, self::BROADCAST_SONG, base64_encode($this->getSongJS($song_id)));
168
169            self::echo_message($this->verbose,
170                "[" . time() . "][info]Broadcast " . $broadcast->id . " now playing song " . $song_id . "." . "\r\n");
171        } else {
172            debug_event(self::class, 'Action unauthorized.', 3);
173        }
174    }
175
176    /**
177     *
178     * @param ConnectionInterface $from
179     * @param integer $song_position
180     */
181    protected function notifySongPosition(ConnectionInterface $from, $song_position)
182    {
183        if ($this->isBroadcaster($from)) {
184            $broadcast = $this->broadcasters[$from->resourceId];
185            $seekdiff  = $broadcast->song_position - $song_position;
186            if ($seekdiff > 2 || $seekdiff < -2) {
187                $clients = $this->getListeners($broadcast);
188                $this->broadcastMessage($clients, self::BROADCAST_SONG_POSITION, $song_position);
189            }
190            $broadcast->song_position = $song_position;
191
192            self::echo_message($this->verbose,
193                "[" . time() . "][info]Broadcast " . $broadcast->id . " has song position to " . $song_position . "." . "\r\n");
194        } else {
195            debug_event(self::class, 'Action unauthorized.', 3);
196        }
197    }
198
199    /**
200     *
201     * @param ConnectionInterface $from
202     * @param boolean $play
203     */
204    protected function notifyPlayerPlay(ConnectionInterface $from, $play)
205    {
206        if ($this->isBroadcaster($from)) {
207            $broadcast = $this->broadcasters[$from->resourceId];
208            $clients   = $this->getListeners($broadcast);
209            $this->broadcastMessage($clients, self::BROADCAST_PLAYER_PLAY, $play ? 'true' : 'false');
210
211            self::echo_message($this->verbose,
212                "[" . time() . "][info]Broadcast " . $broadcast->id . " player state: " . $play . "." . "\r\n");
213        } else {
214            debug_event(self::class, 'Action unauthorized.', 3);
215        }
216    }
217
218    /**
219     *
220     * @param ConnectionInterface $from
221     */
222    protected function notifyEnded(ConnectionInterface $from)
223    {
224        if ($this->isBroadcaster($from)) {
225            $broadcast = $this->broadcasters[$from->resourceId];
226            $clients   = $this->getListeners($broadcast);
227            $this->broadcastMessage($clients, self::BROADCAST_ENDED);
228
229            self::echo_message($this->verbose,
230                "[" . time() . "][info]Broadcast " . $broadcast->id . " ended." . "\r\n");
231        } else {
232            debug_event(self::class, 'Action unauthorized.', 3);
233        }
234    }
235
236    /**
237     *
238     * @param ConnectionInterface $from
239     * @param string $broadcast_key
240     */
241    protected function registerBroadcast(ConnectionInterface $from, $broadcast_key)
242    {
243        $broadcast = Broadcast::get_broadcast($broadcast_key);
244        if ($broadcast) {
245            $this->broadcasters[$from->resourceId] = $broadcast;
246            $this->listeners[$broadcast->id]       = array();
247
248            self::echo_message($this->verbose, "[info]Broadcast " . $broadcast->id . " registered." . "\r\n");
249        }
250    }
251
252    /**
253     *
254     * @param ConnectionInterface $conn
255     */
256    protected function unregisterBroadcast(ConnectionInterface $conn)
257    {
258        $broadcast = $this->broadcasters[$conn->resourceId];
259        $clients   = $this->getListeners($broadcast);
260        $this->broadcastMessage($clients, self::BROADCAST_ENDED);
261        $broadcast->update_state(false);
262
263        unset($this->listeners[$broadcast->id]);
264        unset($this->broadcasters[$conn->resourceId]);
265
266        self::echo_message($this->verbose,
267            "[" . time() . "][info]Broadcast " . $broadcast->id . " unregistered." . "\r\n");
268    }
269
270    /**
271     * getRunningBroadcast
272     * @param integer $broadcast_id
273     * @return Broadcast
274     */
275    protected function getRunningBroadcast($broadcast_id)
276    {
277        $result = null;
278        foreach ($this->broadcasters as $broadcast) {
279            if ($broadcast->id == $broadcast_id) {
280                $result = $broadcast;
281                break;
282            }
283        }
284
285        return $result;
286    }
287
288    /**
289     *
290     * @param ConnectionInterface $from
291     * @param integer $broadcast_id
292     */
293    protected function registerListener(ConnectionInterface $from, $broadcast_id)
294    {
295        $broadcast = $this->getRunningBroadcast($broadcast_id);
296
297        if (!$broadcast->is_private || !AmpConfig::get('require_session') || Session::exists('stream',
298                $this->sids[$from->resourceId])) {
299            $this->listeners[$broadcast->id][] = $from;
300
301            // Send current song and song position to
302            $this->broadcastMessage(array($from), self::BROADCAST_SONG,
303                base64_encode($this->getSongJS($broadcast->song)));
304            $this->broadcastMessage(array($from), self::BROADCAST_SONG_POSITION, $broadcast->song_position);
305            $this->notifyNbListeners($broadcast);
306
307            self::echo_message($this->verbose, "[info]New listener on broadcast " . $broadcast->id . "." . "\r\n");
308        } else {
309            debug_event(self::class, 'Listener unauthorized.', 3);
310        }
311    }
312
313    /**
314     *
315     * @param ConnectionInterface $conn
316     * @param string $sid
317     */
318    protected function authSid(ConnectionInterface $conn, $sid)
319    {
320        if (Session::exists('stream', $sid)) {
321            $this->sids[$conn->resourceId] = $sid;
322        } else {
323            self::echo_message($this->verbose, "Wrong listener session " . $sid . "\r\n");
324        }
325    }
326
327    /**
328     *
329     * @param ConnectionInterface $conn
330     */
331    protected function unregisterListener(ConnectionInterface $conn)
332    {
333        foreach ($this->listeners as $broadcast_id => $brlisteners) {
334            $lindex = array_search($conn, $brlisteners);
335            if ($lindex) {
336                unset($this->listeners[$broadcast_id][$lindex]);
337                echo "[info]Listener left broadcast " . $broadcast_id . "." . "\r\n";
338
339                foreach ($this->broadcasters as $broadcast) {
340                    if ($broadcast->id == $broadcast_id) {
341                        $this->notifyNbListeners($broadcast);
342                        break;
343                    }
344                }
345
346                break;
347            }
348        }
349    }
350
351    /**
352     *
353     * @param Broadcast $broadcast
354     */
355    protected function notifyNbListeners(Broadcast $broadcast)
356    {
357        $broadcaster_id = array_search($broadcast, $this->broadcasters);
358        if ($broadcaster_id) {
359            $clients      = $this->listeners[$broadcast->id];
360            $clients[]    = $this->clients[$broadcaster_id];
361            $nb_listeners = count($this->listeners[$broadcast->id]);
362            $broadcast->update_listeners($nb_listeners);
363            $this->broadcastMessage($clients, self::BROADCAST_NB_LISTENERS, $nb_listeners);
364        }
365    }
366
367    /**
368     *
369     * @param Broadcast $broadcast
370     * @return ConnectionInterface[]
371     */
372    protected function getListeners(Broadcast $broadcast)
373    {
374        return $this->listeners[$broadcast->id];
375    }
376
377    /**
378     *
379     * @param ConnectionInterface $conn
380     * @return boolean
381     */
382    protected function isBroadcaster(ConnectionInterface $conn)
383    {
384        return array_key_exists($conn->resourceId, $this->broadcasters);
385    }
386
387    /**
388     *
389     * @param ConnectionInterface[] $clients
390     * @param string $cmd
391     * @param string $value
392     */
393    protected function broadcastMessage($clients, $cmd, $value = '')
394    {
395        $msg = $cmd . ':' . $value . ';';
396        foreach ($clients as $client) {
397            $sid = $this->sids[$client->resourceId];
398            if ($sid) {
399                Session::extend($sid, 'stream');
400            }
401            $client->send($msg);
402        }
403    }
404
405    /**
406     *
407     * @param ConnectionInterface $conn
408     */
409    public function onClose(ConnectionInterface $conn)
410    {
411        if ($this->isBroadcaster($conn)) {
412            $this->unregisterBroadcast($conn);
413        } else {
414            $this->unregisterListener($conn);
415        }
416
417        unset($this->clients[$conn->resourceId]);
418        unset($this->sids[$conn->resourceId]);
419    }
420
421    /**
422     * onError
423     * @param ConnectionInterface $conn
424     * @param Exception $error
425     * @noinspection PhpParameterNameChangedDuringInheritanceInspection
426     */
427    public function onError(ConnectionInterface $conn, Exception $error)
428    {
429        debug_event(self::class, 'Broadcast error: ' . $error->getMessage(), 1);
430        $conn->close();
431    }
432
433    /**
434     * get_address
435     * @return string
436     */
437    public static function get_address()
438    {
439        $websocket_address = AmpConfig::get('websocket_address');
440        if (empty($websocket_address)) {
441            $websocket_address = 'ws://' . Core::get_server('SERVER_NAME') . ':8100';
442        }
443
444        return $websocket_address . '/broadcast';
445    }
446
447    /**
448     * echo_message
449     * @param boolean $verbose
450     * @param string $message
451     */
452    private static function echo_message($verbose, $message)
453    {
454        if ($verbose) {
455            echo $message;
456        }
457    }
458}
459