1from dbus_next.service import ServiceInterface, dbus_property, method, signal, Variant
2from dbus_next import PropertyAccess, RequestNameReply, BusType
3from dbus_next.aio import MessageBus
4
5import asyncio
6
7
8async def setup_mpris(*names, bus_address=None, system=False):
9    # TODO maybe they should all share a bus for speed
10    async def setup(name):
11        if system:
12            bus_type = BusType.SYSTEM
13        else:
14            bus_type = BusType.SESSION
15        bus = await MessageBus(bus_type=bus_type,
16                               bus_address=bus_address).connect()
17        player = MprisPlayer(bus)
18        bus.export('/org/mpris/MediaPlayer2', player)
19        bus.export('/org/mpris/MediaPlayer2', MprisRoot())
20        reply = await bus.request_name(f'org.mpris.MediaPlayer2.{name}')
21        assert reply == RequestNameReply.PRIMARY_OWNER
22        return player
23
24    players = await asyncio.gather(*(setup(name) for name in names))
25    await asyncio.gather(*(p.ping() for p in players))
26    return players
27
28
29async def setup_playerctld(bus_address=None):
30    bus = await MessageBus(bus_address=bus_address).connect()
31    playerctld = PlayerctldInterface(bus)
32    bus.export('/org/mpris/MediaPlayer2', playerctld)
33    reply = await bus.request_name('org.mpris.MediaPlayer2.playerctld')
34    assert reply == RequestNameReply.PRIMARY_OWNER
35    return playerctld
36
37
38class MprisRoot(ServiceInterface):
39    def __init__(self):
40        super().__init__('org.mpris.MediaPlayer2')
41
42    @method()
43    def Raise(self):
44        return
45
46    @method()
47    def Quit(self):
48        return
49
50    @dbus_property(access=PropertyAccess.READ)
51    def CanRaise(self) -> 'b':
52        return False
53
54    @dbus_property(access=PropertyAccess.READ)
55    def HasTrackList(self) -> 'b':
56        return False
57
58    @dbus_property(access=PropertyAccess.READ)
59    def Identity(self) -> 's':
60        return 'playerctl test client'
61
62    @dbus_property(access=PropertyAccess.READ)
63    def SupportedUriSchemes(self) -> 'as':
64        return ['file']
65
66    @dbus_property(access=PropertyAccess.READ)
67    def SupportedMimeTypes(self) -> 'as':
68        return ['audio/mp3']
69
70
71class MprisPlayer(ServiceInterface):
72    def __init__(self, bus):
73        super().__init__('org.mpris.MediaPlayer2.Player')
74        self.counter = 0
75        self.reset()
76        self.bus = bus
77
78    def reset(self):
79        # method calls
80        self.next_called = False
81        self.previous_called = False
82        self.pause_called = False
83        self.play_pause_called = False
84        self.stop_called = False
85        self.play_called = False
86        self.seek_called_with = None
87        self.set_position_called_with = None
88        self.open_uri_called_with = None
89
90        # properties
91        self.playback_status = 'Playing'
92        self.loop_status = 'None'
93        self.rate = 1.0
94        self.shuffle = False
95        self.metadata = {}
96        self.volume = 1.0
97        self.position = 0
98        self.minimum_rate = 1.0
99        self.maximum_rate = 1.0
100        self.can_go_next = True
101        self.can_go_previous = True
102        self.can_play = True
103        self.can_pause = True
104        self.can_seek = True
105        self.can_control = True
106
107        # signals
108        self.seeked_value = 0
109
110    async def ping(self):
111        await self.bus.introspect('org.freedesktop.DBus',
112                                  '/org/freedesktop/DBus')
113
114    async def set_artist_title(self, artist, title, track_id=None):
115        if track_id is None:
116            self.counter += 1
117            track_id = '/' + str(self.counter)
118
119        self.metadata = {
120            'xesam:title': Variant('s', title),
121            'xesam:artist': Variant('as', [artist]),
122            'mpris:trackid': Variant('o', track_id),
123        }
124
125        self.emit_properties_changed({
126            'Metadata': self.metadata,
127        })
128        await self.ping()
129
130    async def clear_metadata(self):
131        self.counter += 1
132        self.metadata = {
133            'mpris:trackid': Variant('o', '/' + str(self.counter)),
134        }
135        self.emit_properties_changed({
136            'Metadata': self.metadata,
137        })
138        await self.ping()
139
140    async def disconnect(self):
141        self.bus.disconnect()
142        await self.bus.wait_for_disconnect()
143
144    @method()
145    def Next(self):
146        self.next_called = True
147
148    @method()
149    def Previous(self):
150        self.previous_called = True
151
152    @method()
153    def Pause(self):
154        self.pause_called = True
155
156    @method()
157    def PlayPause(self):
158        self.play_pause_called = True
159
160    @method()
161    def Stop(self):
162        self.stop_called = True
163
164    @method()
165    def Play(self):
166        self.play_called = True
167
168    @method()
169    def Seek(self, offset: 'x'):
170        self.seek_called_with = offset
171
172    @method()
173    def SetPosition(self, track_id: 'o', position: 'x'):
174        self.set_position_called_with = (track_id, position)
175
176    @method()
177    def OpenUri(self, uri: 's'):
178        self.open_uri_called_with = uri
179
180    @signal()
181    def Seeked(self) -> 'x':
182        return self.seeked_value
183
184    @dbus_property(access=PropertyAccess.READ)
185    def PlaybackStatus(self) -> 's':
186        return self.playback_status
187
188    @dbus_property()
189    def LoopStatus(self) -> 's':
190        return self.loop_status
191
192    @LoopStatus.setter
193    def LoopStatus(self, status: 's'):
194        self.loop_status = status
195
196    @dbus_property()
197    def Rate(self) -> 'd':
198        return self.rate
199
200    @Rate.setter
201    def Rate(self, rate: 'd'):
202        self.rate = rate
203
204    @dbus_property()
205    def Shuffle(self) -> 'b':
206        return self.shuffle
207
208    @Shuffle.setter
209    def Shuffle(self, shuffle: 'b'):
210        self.shuffle = shuffle
211
212    @dbus_property(access=PropertyAccess.READ)
213    def Metadata(self) -> 'a{sv}':
214        return self.metadata
215
216    @dbus_property()
217    def Volume(self) -> 'd':
218        return self.volume
219
220    @Volume.setter
221    def Volume(self, volume: 'd'):
222        self.volume = volume
223
224    @dbus_property(access=PropertyAccess.READ)
225    def Position(self) -> 'x':
226        return self.position
227
228    @dbus_property(access=PropertyAccess.READ)
229    def MinimumRate(self) -> 'd':
230        return self.minimum_rate
231
232    @dbus_property(access=PropertyAccess.READ)
233    def MaximumRate(self) -> 'd':
234        return self.maximum_rate
235
236    @dbus_property(access=PropertyAccess.READ)
237    def CanGoNext(self) -> 'b':
238        return self.can_go_next
239
240    @dbus_property(access=PropertyAccess.READ)
241    def CanGoPrevious(self) -> 'b':
242        return self.can_go_previous
243
244    @dbus_property(access=PropertyAccess.READ)
245    def CanPlay(self) -> 'b':
246        return self.can_play
247
248    @dbus_property(access=PropertyAccess.READ)
249    def CanPause(self) -> 'b':
250        return self.can_pause
251
252    @dbus_property(access=PropertyAccess.READ)
253    def CanSeek(self) -> 'b':
254        return self.can_seek
255
256    @dbus_property(access=PropertyAccess.READ)
257    def CanControl(self) -> 'b':
258        return self.can_control
259
260
261class PlayerctldInterface(ServiceInterface):
262    '''just enough of playerctld for testing'''
263    def __init__(self, bus):
264        super().__init__('com.github.altdesktop.playerctld')
265        self.bus = bus
266        self.player_names = []
267
268    @dbus_property(access=PropertyAccess.READ)
269    def PlayerNames(self) -> 'as':
270        return self.player_names
271
272    async def disconnect(self):
273        self.bus.disconnect()
274        await self.bus.wait_for_disconnect()
275