1import pytest
2
3import os
4from .mpris import setup_mpris
5from .playerctl import PlayerctlCli
6from dbus_next.aio import MessageBus
7from dbus_next import Message, MessageType
8
9import asyncio
10from asyncio import Queue
11from subprocess import run as run_process
12
13
14async def start_playerctld(bus_address, debug=False):
15    pkill = await asyncio.create_subprocess_shell('pkill playerctld')
16    await pkill.wait()
17    env = os.environ.copy()
18    env['DBUS_SESSION_BUS_ADDRESS'] = bus_address
19    env['G_MESSAGES_DEBUG'] = 'playerctl'
20    proc = await asyncio.create_subprocess_shell(
21        'playerctld',
22        env=env,
23        stdout=asyncio.subprocess.PIPE,
24        stderr=asyncio.subprocess.STDOUT)
25
26    async def printer(stream):
27        while True:
28            line = await stream.readline()
29            print(line)
30            if not line:
31                break
32
33    if debug:
34        asyncio.get_event_loop().create_task(printer(proc.stdout))
35
36    return proc
37
38
39async def get_playerctld(bus):
40    path = '/com/github/altdesktop/playerctld'
41    interface = 'com.github.altdesktop.playerctld'
42    introspection = await bus.introspect('org.mpris.MediaPlayer2.playerctld',
43                                         path)
44    obj = bus.get_proxy_object(interface, path, introspection)
45    return obj.get_interface('org.freedesktop.DBus.Properties')
46
47
48@pytest.mark.asyncio
49async def test_daemon_commands(bus_address):
50    playerctl = PlayerctlCli(bus_address)
51
52    async def run(cmd):
53        return await playerctl.run('-p playerctld ' + cmd)
54
55    # with no other players running, these should error because there's no
56    # active player (not no players found). This tests activation and property
57    # errors as well.
58    results = await asyncio.gather(*(run(cmd)
59                                     for cmd in ('play', 'pause', 'play-pause',
60                                                 'stop', 'next', 'previous',
61                                                 'position', 'volume',
62                                                 'status', 'metadata', 'loop',
63                                                 'shuffle')))
64    for result in results:
65        assert result.returncode == 1
66        assert 'No player could handle this command' in result.stderr.splitlines(
67        )
68
69    # restart playerctld so we can manage the process and see debug info
70    playerctld_proc = await start_playerctld(bus_address)
71
72    [mpris1, mpris2, mpris3] = await setup_mpris('daemon1',
73                                                 'daemon2',
74                                                 'daemon3',
75                                                 bus_address=bus_address)
76    await mpris2.set_artist_title('artist', 'title')
77    cmd = await run('play')
78    assert cmd.returncode == 0, cmd.stdout
79    assert mpris2.play_called, cmd.stdout
80    mpris2.reset()
81
82    await mpris1.set_artist_title('artist', 'title')
83    cmd = await run('play')
84    assert cmd.returncode == 0, cmd.stderr
85    assert mpris1.play_called
86    mpris1.reset()
87
88    await mpris3.set_artist_title('artist', 'title')
89    cmd = await run('play')
90    assert cmd.returncode == 0, cmd.stderr
91    assert mpris3.play_called
92    mpris3.reset()
93
94    await mpris3.disconnect()
95    cmd = await run('play')
96    assert cmd.returncode == 0, cmd.stderr
97    assert mpris1.play_called
98    mpris1.reset()
99
100    await asyncio.gather(mpris1.disconnect(), mpris2.disconnect())
101
102    playerctld_proc.terminate()
103    await playerctld_proc.wait()
104
105
106@pytest.mark.asyncio
107async def test_daemon_follow(bus_address):
108    playerctld_proc = await start_playerctld(bus_address)
109
110    [mpris1, mpris2] = await setup_mpris('player1',
111                                         'player2',
112                                         bus_address=bus_address)
113    playerctl = PlayerctlCli(bus_address)
114    pctl_cmd = '--player playerctld metadata --format "{{playerInstance}}: {{artist}} - {{title}}" --follow'
115    proc = await playerctl.start(pctl_cmd)
116
117    await mpris1.set_artist_title('artist1', 'title1')
118    line = await proc.queue.get()
119    assert line == 'playerctld: artist1 - title1', proc.queue
120
121    await mpris2.set_artist_title('artist2', 'title2')
122    line = await proc.queue.get()
123    assert line == 'playerctld: artist2 - title2', proc.queue
124
125    [mpris3] = await setup_mpris('player3', bus_address=bus_address)
126    await mpris3.set_artist_title('artist3', 'title3')
127    line = await proc.queue.get()
128
129    if line == '':
130        # the line might be blank here because of the test setup
131        line = await proc.queue.get()
132
133    assert line == 'playerctld: artist3 - title3', proc.queue
134
135    await mpris1.set_artist_title('artist4', 'title4')
136    line = await proc.queue.get()
137    assert line == 'playerctld: artist4 - title4', proc.queue
138
139    await mpris1.set_artist_title('artist5', 'title5')
140    line = await proc.queue.get()
141    assert line == 'playerctld: artist5 - title5', proc.queue
142
143    await mpris1.disconnect()
144    line = await proc.queue.get()
145    assert line == 'playerctld: artist3 - title3', proc.queue
146
147    await asyncio.gather(mpris2.disconnect(), mpris3.disconnect())
148
149    playerctld_proc.terminate()
150    proc.proc.terminate()
151    await proc.proc.wait()
152    await playerctld_proc.wait()
153
154
155async def playerctld_shift(bus_address, reverse=False):
156    env = os.environ.copy()
157    env['DBUS_SESSION_BUS_ADDRESS'] = bus_address
158    env['G_MESSAGES_DEBUG'] = 'playerctl'
159    cmd = 'playerctld unshift' if reverse else 'playerctld shift'
160    shift = await asyncio.create_subprocess_shell(
161        cmd,
162        env=env,
163        stdout=asyncio.subprocess.PIPE,
164        stderr=asyncio.subprocess.STDOUT)
165    return await shift.wait()
166
167
168@pytest.mark.asyncio
169async def test_daemon_shift_simple(bus_address):
170    playerctld_proc = await start_playerctld(bus_address)
171
172    mprises = await setup_mpris('player1',
173                                'player2',
174                                'player3',
175                                bus_address=bus_address)
176    [mpris1, mpris2, mpris3] = mprises
177
178    playerctl = PlayerctlCli(bus_address)
179    pctl_cmd = '--player playerctld metadata --format "{{playerInstance}}: {{artist}} - {{title}}" --follow'
180    proc = await playerctl.start(pctl_cmd)
181
182    await mpris1.set_artist_title('artist1', 'title1')
183    line = await proc.queue.get()
184    assert line == 'playerctld: artist1 - title1', proc.queue
185
186    await mpris2.set_artist_title('artist2', 'title2')
187    line = await proc.queue.get()
188    assert line == 'playerctld: artist2 - title2', proc.queue
189
190    await mpris3.set_artist_title('artist3', 'title3')
191    line = await proc.queue.get()
192    assert line == 'playerctld: artist3 - title3', proc.queue
193
194    code = await playerctld_shift(bus_address)
195    assert code == 0
196    line = await proc.queue.get()
197    assert line == 'playerctld: artist2 - title2', proc.queue
198
199    code = await playerctld_shift(bus_address)
200    assert code == 0
201    line = await proc.queue.get()
202    assert line == 'playerctld: artist1 - title1', proc.queue
203
204    code = await playerctld_shift(bus_address, reverse=True)
205    assert code == 0
206    line = await proc.queue.get()
207    assert line == 'playerctld: artist2 - title2', proc.queue
208
209    code = await playerctld_shift(bus_address, reverse=True)
210    assert code == 0
211    line = await proc.queue.get()
212    assert line == 'playerctld: artist3 - title3', proc.queue
213
214    playerctld_proc.terminate()
215    proc.proc.terminate()
216    await asyncio.gather(mpris1.disconnect(), mpris2.disconnect(),
217                         playerctld_proc.wait(), proc.proc.wait())
218
219
220@pytest.mark.asyncio
221async def test_daemon_shift_no_player(bus_address):
222    playerctld_proc = await start_playerctld(bus_address)
223
224    playerctl = PlayerctlCli(bus_address)
225    pctl_cmd = '--player playerctld metadata --format "{{playerInstance}}: {{artist}} - {{title}}" --follow'
226    proc = await playerctl.start(pctl_cmd)
227
228    code = await playerctld_shift(bus_address)
229    assert code == 1
230
231    [mpris1] = await setup_mpris('player1', bus_address=bus_address)
232    code = await playerctld_shift(bus_address)
233    assert code == 0
234
235    await mpris1.disconnect()
236    code = await playerctld_shift(bus_address)
237    assert code == 1
238
239    code = await playerctld_shift(bus_address, reverse=True)
240    assert code == 1
241
242    [mpris1] = await setup_mpris('player1', bus_address=bus_address)
243    code = await playerctld_shift(bus_address, reverse=True)
244    assert code == 0
245
246    await mpris1.disconnect()
247    code = await playerctld_shift(bus_address, reverse=True)
248    assert code == 1
249
250    playerctld_proc.terminate()
251    await playerctld_proc.wait()
252
253
254@pytest.mark.asyncio
255async def test_active_player_change(bus_address):
256    queue = Queue()
257    playerctld_proc = await start_playerctld(bus_address)
258
259    bus = await MessageBus(bus_address=bus_address).connect()
260
261    reply = await bus.call(
262        Message(destination='org.freedesktop.DBus',
263                interface='org.freedesktop.DBus',
264                path='/org/freedesktop/DBus',
265                member='AddMatch',
266                signature='s',
267                body=["sender='org.mpris.MediaPlayer2.playerctld'"]))
268
269    assert reply.message_type == MessageType.METHOD_RETURN, reply.body
270
271    def message_handler(message):
272        if message.member == 'PropertiesChanged' and message.body[
273                0] == 'com.github.altdesktop.playerctld' and 'PlayerNames' in message.body[
274                    1]:
275            queue.put_nowait(message.body[1]['PlayerNames'].value)
276
277    def player_list(*args):
278        return [f'org.mpris.MediaPlayer2.{name}' for name in args]
279
280    bus.add_message_handler(message_handler)
281
282    [mpris1] = await setup_mpris('player1', bus_address=bus_address)
283
284    assert player_list('player1') == await queue.get()
285
286    [mpris2] = await setup_mpris('player2', bus_address=bus_address)
287
288    assert player_list('player2', 'player1') == await queue.get()
289
290    # changing artist/title should bump the player up
291    await mpris1.set_artist_title('artist1', 'title1', '/1')
292
293    assert player_list('player1', 'player2') == await queue.get()
294
295    # if properties are not actually different, it shouldn't update
296    await mpris2.set_artist_title('artist2', 'title2', '/2')
297    assert player_list('player2', 'player1') == await queue.get()
298
299    await mpris1.set_artist_title('artist1', 'title1', '/1')
300    await mpris1.ping()
301    assert queue.empty()
302
303    bus.disconnect()
304    await asyncio.gather(mpris1.disconnect(), mpris2.disconnect(),
305                         bus.wait_for_disconnect())
306
307    playerctld_proc.terminate()
308    await playerctld_proc.wait()
309