<lambda>null1 package io.casey.musikcube.remote.service.playback.impl.remote
2 
3 import android.os.Handler
4 import android.os.Looper
5 import io.casey.musikcube.remote.Application
6 import io.casey.musikcube.remote.injection.DaggerServiceComponent
7 import io.casey.musikcube.remote.service.playback.IPlaybackService
8 import io.casey.musikcube.remote.service.playback.PlaybackState
9 import io.casey.musikcube.remote.service.playback.QueryContext
10 import io.casey.musikcube.remote.service.playback.RepeatMode
11 import io.casey.musikcube.remote.service.websocket.Messages
12 import io.casey.musikcube.remote.service.websocket.SocketMessage
13 import io.casey.musikcube.remote.service.websocket.WebSocketService
14 import io.casey.musikcube.remote.service.websocket.model.IMetadataProxy
15 import io.casey.musikcube.remote.service.websocket.model.ITrack
16 import io.casey.musikcube.remote.service.websocket.model.ITrackListQueryFactory
17 import io.casey.musikcube.remote.service.websocket.model.impl.remote.RemoteTrack
18 import io.reactivex.Observable
19 import org.json.JSONObject
20 import java.util.*
21 import javax.inject.Inject
22 
23 class RemotePlaybackService : IPlaybackService {
24     private interface Key {
25         companion object {
26             const val STATE = "state"
27             const val REPEAT_MODE = "repeat_mode"
28             const val VOLUME = "volume"
29             const val SHUFFLED = "shuffled"
30             const val MUTED = "muted"
31             const val PLAY_QUEUE_COUNT = "track_count"
32             const val PLAY_QUEUE_POSITION = "play_queue_position"
33             const val PLAYING_DURATION = "playing_duration"
34             const val PLAYING_CURRENT_TIME = "playing_current_time"
35             const val PLAYING_TRACK = "playing_track"
36         }
37     }
38 
39     /**
40      * an annoying little class that maintains and updates state that estimates
41      * the currently playing time. remember, here we're a remote control, so we
42      * don't know the exact position of the play head! we update every 5 seconds
43      * and estimate.
44      */
45     private class EstimatedPosition {
46         var lastTime = 0.0
47         var pauseTime = 0.0
48         var trackId: Long = -1
49         var queryTime: Long = 0
50 
51         fun get(track: JSONObject?): Double {
52             if (track != null && track.optLong(Metadata.Track.ID, -1L) == trackId && trackId != -1L) {
53                 return if (pauseTime != 0.0) pauseTime else estimatedTime()
54             }
55             return 0.0
56         }
57 
58         fun update(message: SocketMessage) {
59             queryTime = System.nanoTime()
60             lastTime = message.getDoubleOption(Messages.Key.PLAYING_CURRENT_TIME, 0.0)
61             trackId = message.getLongOption(Messages.Key.ID, -1)
62         }
63 
64         fun pause() {
65             pauseTime = estimatedTime()
66         }
67 
68         fun resume() {
69             lastTime = pauseTime
70             queryTime = System.nanoTime()
71             pauseTime = 0.0
72         }
73 
74         fun update(time: Double, id: Long) {
75             queryTime = System.nanoTime()
76             lastTime = time
77             trackId = id
78 
79             if (pauseTime != 0.0) {
80                 pauseTime = time /* ehh... */
81             }
82         }
83 
84         fun reset() {
85             pauseTime = 0.0
86             lastTime = pauseTime
87             queryTime = System.nanoTime()
88             trackId = -1
89         }
90 
91         fun estimatedTime(): Double {
92             val diff = System.nanoTime() - queryTime
93             val seconds = diff.toDouble() / NANOSECONDS_PER_SECOND
94             return lastTime + seconds
95         }
96     }
97 
98     @Inject lateinit var wss: WebSocketService
99     @Inject lateinit var metadataProxy: IMetadataProxy
100 
101     private val handler = Handler(Looper.getMainLooper())
102     private val listeners = HashSet<() -> Unit>()
103     private val estimatedTime = EstimatedPosition()
104 
105     override var state = PlaybackState.Stopped
106         private set
107 
108     override val currentTime: Double
109         get() = estimatedTime.get(track)
110 
111     override var repeatMode: RepeatMode = RepeatMode.None
112         private set
113 
114     override var shuffled: Boolean = false
115         private set
116 
117     override var muted: Boolean = false
118         private set
119 
120     override var volume: Double = 0.0
121         private set
122 
123     override var queueCount: Int = 0
124         private set
125 
126     override var queuePosition: Int = 0
127         private set
128 
129     override var duration: Double = 0.0
130         private set
131 
132     private var track: JSONObject = JSONObject()
133 
134     init {
135         DaggerServiceComponent.builder()
136             .appComponent(Application.appComponent)
137             .build().inject(this)
138 
139         reset()
140     }
141 
142     override fun playAll() {
143         playAll(0, "")
144     }
145 
146     override fun playAll(index: Int, filter: String) {
147         wss.send(SocketMessage.Builder
148             .request(Messages.Request.PlayAllTracks)
149             .addOption(Messages.Key.INDEX, index)
150             .addOption(Messages.Key.FILTER, filter)
151             .build())
152     }
153 
154     override fun play(category: String, categoryId: Long, index: Int, filter: String) {
155         wss.send(SocketMessage.Builder
156             .request(Messages.Request.PlayTracksByCategory)
157             .addOption(Messages.Key.CATEGORY, category)
158             .addOption(Messages.Key.ID, categoryId)
159             .addOption(Messages.Key.INDEX, index)
160             .addOption(Messages.Key.FILTER, filter)
161             .build())
162     }
163 
164     override fun playFrom(service: IPlaybackService) {
165         service.queryContext?.let {qc ->
166             val time = service.currentTime
167             val index = service.queuePosition
168 
169             when (qc.type) {
170                 Messages.Request.PlaySnapshotTracks -> {
171                     wss.send(SocketMessage.Builder
172                         .request(Messages.Request.PlaySnapshotTracks)
173                         .addOption(Messages.Key.TIME, time)
174                         .addOption(Messages.Key.INDEX, index)
175                         .build())
176                 }
177                 Messages.Request.QueryTracks,
178                 Messages.Request.QueryTracksByCategory -> {
179                     wss.send(SocketMessage.Builder
180                         .request(Messages.Request.PlayTracksByCategory)
181                         .addOption(Messages.Key.CATEGORY, qc.category)
182                         .addOption(Messages.Key.ID, qc.categoryId)
183                         .addOption(Messages.Key.FILTER, qc.filter)
184                         .addOption(Messages.Key.TIME, time)
185                         .addOption(Messages.Key.INDEX, index)
186                         .build())
187                 }
188                 else -> { }
189             }
190         }
191     }
192 
193     override fun prev() {
194         wss.send(SocketMessage.Builder.request(Messages.Request.Previous).build())
195     }
196 
197     override fun pauseOrResume() {
198         wss.send(SocketMessage.Builder.request(Messages.Request.PauseOrResume).build())
199     }
200 
201     override fun pause() {
202         if (state != PlaybackState.Paused) {
203             pauseOrResume()
204         }
205     }
206 
207     override fun resume() {
208         if (state != PlaybackState.Playing) {
209             pauseOrResume()
210         }
211     }
212 
213     override fun stop() {
214         /* nothing for now */
215     }
216 
217     override fun next() {
218         wss.send(SocketMessage.Builder.request(Messages.Request.Next).build())
219     }
220 
221     override fun playAt(index: Int) {
222         wss.send(SocketMessage
223             .Builder.request(Messages.Request.PlayAtIndex)
224             .addOption(Messages.Key.INDEX, index)
225             .build())
226     }
227 
228     override fun volumeUp() {
229         wss.send(SocketMessage.Builder
230             .request(Messages.Request.SetVolume)
231             .addOption(Messages.Key.RELATIVE, Messages.Value.UP)
232             .build())
233     }
234 
235     override fun volumeDown() {
236         wss.send(SocketMessage.Builder
237             .request(Messages.Request.SetVolume)
238             .addOption(Messages.Key.RELATIVE, Messages.Value.DOWN)
239             .build())
240     }
241 
242     override fun seekForward() {
243         wss.send(SocketMessage.Builder
244             .request(Messages.Request.SeekRelative)
245             .addOption(Messages.Key.DELTA, 5.0f).build())
246     }
247 
248     override fun seekBackward() {
249         wss.send(SocketMessage.Builder
250             .request(Messages.Request.SeekRelative)
251             .addOption(Messages.Key.DELTA, -5.0f).build())
252     }
253 
254     override fun seekTo(seconds: Double) {
255         wss.send(SocketMessage.Builder
256             .request(Messages.Request.SeekTo)
257             .addOption(Messages.Key.POSITION, seconds).build())
258 
259         estimatedTime.update(seconds, estimatedTime.trackId)
260     }
261 
262     @Synchronized override fun connect(listener: () -> Unit) {
263         listeners.add(listener)
264 
265         if (listeners.size == 1) {
266             wss.addClient(client)
267             metadataProxy.attach()
268             scheduleTimeSyncMessage()
269         }
270     }
271 
272     @Synchronized override fun disconnect(listener: () -> Unit) {
273         listeners.remove(listener)
274 
275         if (listeners.size == 0) {
276             wss.removeClient(client)
277             metadataProxy.detach()
278             handler.removeCallbacks(syncTimeRunnable)
279         }
280     }
281 
282     override fun toggleShuffle() {
283         wss.send(SocketMessage.Builder
284             .request(Messages.Request.ToggleShuffle).build())
285     }
286 
287     override fun toggleMute() {
288         wss.send(SocketMessage.Builder
289             .request(Messages.Request.ToggleMute).build())
290     }
291 
292     override fun toggleRepeatMode() {
293         wss.send(SocketMessage.Builder
294             .request(Messages.Request.ToggleRepeat)
295             .build())
296     }
297 
298     override val bufferedTime: Double
299         get() = duration
300 
301     override val playingTrack: ITrack
302         get() = RemoteTrack(track)
303 
304     private fun reset() {
305         state = PlaybackState.Stopped
306         repeatMode = RepeatMode.None
307         muted = false
308         shuffled = muted
309         volume = 0.0
310         queuePosition = 0
311         queueCount = queuePosition
312         track = JSONObject()
313         estimatedTime.reset()
314     }
315 
316     private fun isPlaybackOverviewMessage(socketMessage: SocketMessage?): Boolean {
317         if (socketMessage == null) {
318             return false
319         }
320 
321         val name = socketMessage.name
322 
323         return Messages.Broadcast.PlaybackOverviewChanged.matches(name) ||
324             Messages.Request.GetPlaybackOverview.matches(name)
325     }
326 
327     private fun updatePlaybackOverview(message: SocketMessage?): Boolean {
328         if (message == null) {
329             reset()
330             return false
331         }
332 
333         if (!isPlaybackOverviewMessage(message)) {
334             throw IllegalArgumentException("invalid message!")
335         }
336 
337         state = PlaybackState.from(message.getStringOption(Key.STATE))
338 
339         when (state) {
340             PlaybackState.Paused -> estimatedTime.pause()
341             PlaybackState.Playing -> {
342                 estimatedTime.resume()
343                 scheduleTimeSyncMessage()
344             }
345             else -> { }
346         }
347 
348         repeatMode = RepeatMode.from(message.getStringOption(Key.REPEAT_MODE))
349         shuffled = message.getBooleanOption(Key.SHUFFLED)
350         muted = message.getBooleanOption(Key.MUTED)
351         volume = message.getDoubleOption(Key.VOLUME)
352         queueCount = message.getIntOption(Key.PLAY_QUEUE_COUNT)
353         queuePosition = message.getIntOption(Key.PLAY_QUEUE_POSITION)
354         duration = message.getDoubleOption(Key.PLAYING_DURATION)
355         track = message.getJsonObjectOption(Key.PLAYING_TRACK, JSONObject()) ?: JSONObject()
356 
357         estimatedTime.update(
358             message.getDoubleOption(Key.PLAYING_CURRENT_TIME, -1.0),
359             track.optLong(Metadata.Track.ID, -1))
360 
361         notifyStateUpdated()
362 
363         return true
364     }
365 
366     @Synchronized private fun notifyStateUpdated() {
367         for (listener in listeners) {
368             listener()
369         }
370     }
371 
372     private fun scheduleTimeSyncMessage() {
373         handler.removeCallbacks(syncTimeRunnable)
374 
375         if (state == PlaybackState.Playing) {
376             handler.postDelayed(syncTimeRunnable, SYNC_TIME_INTERVAL_MS)
377         }
378     }
379 
380     private val syncTimeRunnable = object: Runnable {
381         override fun run() {
382             if (wss.hasClient(client)) {
383                 wss.send(SocketMessage.Builder
384                     .request(Messages.Request.GetCurrentTime).build())
385             }
386         }
387     }
388 
389     override val queryContext: QueryContext
390         get() = QueryContext(Messages.Request.QueryPlayQueueTracks)
391 
392     override val playlistQueryFactory: ITrackListQueryFactory = object : ITrackListQueryFactory {
393         override fun count(): Observable<Int> = metadataProxy.getPlayQueueTracksCount()
394         override fun page(offset: Int, limit: Int): Observable<List<ITrack>> = metadataProxy.getPlayQueueTracks(limit, offset)
395         override fun offline(): Boolean  = false
396     }
397 
398     private val client = object : WebSocketService.Client {
399         override fun onStateChanged(newState: WebSocketService.State, oldState: WebSocketService.State) {
400             if (newState === WebSocketService.State.Connected) {
401                 wss.send(SocketMessage.Builder.request(
402                     Messages.Request.GetPlaybackOverview.toString()).build())
403             }
404             else if (newState === WebSocketService.State.Disconnected) {
405                 reset()
406                 notifyStateUpdated()
407             }
408         }
409 
410         override fun onMessageReceived(message: SocketMessage) {
411             if (isPlaybackOverviewMessage(message)) {
412                 updatePlaybackOverview(message)
413             }
414             else if (Messages.Request.GetCurrentTime.matches(message.name)) {
415                 estimatedTime.update(message)
416                 scheduleTimeSyncMessage()
417             }
418         }
419 
420         override fun onInvalidPassword() {
421         }
422     }
423 
424     companion object {
425         private const val NANOSECONDS_PER_SECOND = 1000000000.0
426         private const val SYNC_TIME_INTERVAL_MS = 5000L
427     }
428 }
429