1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import threading
35import time
36import unittest
37
38from datetime import datetime
39
40import adb
41
42def requires_root(func):
43    def wrapper(self, *args):
44        if self.device.get_prop('ro.debuggable') != '1':
45            raise unittest.SkipTest('requires rootable build')
46
47        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48        if not was_root:
49            self.device.root()
50            self.device.wait()
51
52        try:
53            func(self, *args)
54        finally:
55            if not was_root:
56                self.device.unroot()
57                self.device.wait()
58
59    return wrapper
60
61
62def requires_non_root(func):
63    def wrapper(self, *args):
64        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65        if was_root:
66            self.device.unroot()
67            self.device.wait()
68
69        try:
70            func(self, *args)
71        finally:
72            if was_root:
73                self.device.root()
74                self.device.wait()
75
76    return wrapper
77
78
79class DeviceTest(unittest.TestCase):
80    def setUp(self):
81        self.device = adb.get_device()
82
83
84class ForwardReverseTest(DeviceTest):
85    def _test_no_rebind(self, description, direction_list, direction,
86                       direction_no_rebind, direction_remove_all):
87        msg = direction_list()
88        self.assertEqual('', msg.strip(),
89                         description + ' list must be empty to run this test.')
90
91        # Use --no-rebind with no existing binding
92        direction_no_rebind('tcp:5566', 'tcp:6655')
93        msg = direction_list()
94        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
95
96        # Use --no-rebind with existing binding
97        with self.assertRaises(subprocess.CalledProcessError):
98            direction_no_rebind('tcp:5566', 'tcp:6677')
99        msg = direction_list()
100        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
101        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
102
103        # Use the absence of --no-rebind with existing binding
104        direction('tcp:5566', 'tcp:6677')
105        msg = direction_list()
106        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
107        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
108
109        direction_remove_all()
110        msg = direction_list()
111        self.assertEqual('', msg.strip())
112
113    def test_forward_no_rebind(self):
114        self._test_no_rebind('forward', self.device.forward_list,
115                            self.device.forward, self.device.forward_no_rebind,
116                            self.device.forward_remove_all)
117
118    def test_reverse_no_rebind(self):
119        self._test_no_rebind('reverse', self.device.reverse_list,
120                            self.device.reverse, self.device.reverse_no_rebind,
121                            self.device.reverse_remove_all)
122
123    def test_forward(self):
124        msg = self.device.forward_list()
125        self.assertEqual('', msg.strip(),
126                         'Forwarding list must be empty to run this test.')
127        self.device.forward('tcp:5566', 'tcp:6655')
128        msg = self.device.forward_list()
129        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
130        self.device.forward('tcp:7788', 'tcp:8877')
131        msg = self.device.forward_list()
132        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
133        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
134        self.device.forward_remove('tcp:5566')
135        msg = self.device.forward_list()
136        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
137        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
138        self.device.forward_remove_all()
139        msg = self.device.forward_list()
140        self.assertEqual('', msg.strip())
141
142    def test_forward_old_protocol(self):
143        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
144
145        msg = self.device.forward_list()
146        self.assertEqual('', msg.strip(),
147                         'Forwarding list must be empty to run this test.')
148
149        s = socket.create_connection(("localhost", 5037))
150        service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno
151        cmd = b"%04x%s" % (len(service), service)
152        s.sendall(cmd)
153
154        msg = self.device.forward_list()
155        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
156
157        self.device.forward_remove_all()
158        msg = self.device.forward_list()
159        self.assertEqual('', msg.strip())
160
161    def test_forward_tcp_port_0(self):
162        self.assertEqual('', self.device.forward_list().strip(),
163                         'Forwarding list must be empty to run this test.')
164
165        try:
166            # If resolving TCP port 0 is supported, `adb forward` will print
167            # the actual port number.
168            port = self.device.forward('tcp:0', 'tcp:8888').strip()
169            if not port:
170                raise unittest.SkipTest('Forwarding tcp:0 is not available.')
171
172            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
173                                      self.device.forward_list()))
174        finally:
175            self.device.forward_remove_all()
176
177    def test_reverse(self):
178        msg = self.device.reverse_list()
179        self.assertEqual('', msg.strip(),
180                         'Reverse forwarding list must be empty to run this test.')
181        self.device.reverse('tcp:5566', 'tcp:6655')
182        msg = self.device.reverse_list()
183        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
184        self.device.reverse('tcp:7788', 'tcp:8877')
185        msg = self.device.reverse_list()
186        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
187        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
188        self.device.reverse_remove('tcp:5566')
189        msg = self.device.reverse_list()
190        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
191        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
192        self.device.reverse_remove_all()
193        msg = self.device.reverse_list()
194        self.assertEqual('', msg.strip())
195
196    def test_reverse_tcp_port_0(self):
197        self.assertEqual('', self.device.reverse_list().strip(),
198                         'Reverse list must be empty to run this test.')
199
200        try:
201            # If resolving TCP port 0 is supported, `adb reverse` will print
202            # the actual port number.
203            port = self.device.reverse('tcp:0', 'tcp:8888').strip()
204            if not port:
205                raise unittest.SkipTest('Reversing tcp:0 is not available.')
206
207            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
208                                      self.device.reverse_list()))
209        finally:
210            self.device.reverse_remove_all()
211
212    def test_forward_reverse_echo(self):
213        """Send data through adb forward and read it back via adb reverse"""
214        forward_port = 12345
215        reverse_port = forward_port + 1
216        forward_spec = 'tcp:' + str(forward_port)
217        reverse_spec = 'tcp:' + str(reverse_port)
218        forward_setup = False
219        reverse_setup = False
220
221        try:
222            # listen on localhost:forward_port, connect to remote:forward_port
223            self.device.forward(forward_spec, forward_spec)
224            forward_setup = True
225            # listen on remote:forward_port, connect to localhost:reverse_port
226            self.device.reverse(forward_spec, reverse_spec)
227            reverse_setup = True
228
229            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
230            with contextlib.closing(listener):
231                # Use SO_REUSEADDR so that subsequent runs of the test can grab
232                # the port even if it is in TIME_WAIT.
233                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
234
235                # Listen on localhost:reverse_port before connecting to
236                # localhost:forward_port because that will cause adb to connect
237                # back to localhost:reverse_port.
238                listener.bind(('127.0.0.1', reverse_port))
239                listener.listen(4)
240
241                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
242                with contextlib.closing(client):
243                    # Connect to the listener.
244                    client.connect(('127.0.0.1', forward_port))
245
246                    # Accept the client connection.
247                    accepted_connection, addr = listener.accept()
248                    with contextlib.closing(accepted_connection) as server:
249                        data = 'hello'
250
251                        # Send data into the port setup by adb forward.
252                        client.sendall(data)
253                        # Explicitly close() so that server gets EOF.
254                        client.close()
255
256                        # Verify that the data came back via adb reverse.
257                        self.assertEqual(data, server.makefile().read())
258        finally:
259            if reverse_setup:
260                self.device.reverse_remove(forward_spec)
261            if forward_setup:
262                self.device.forward_remove(forward_spec)
263
264
265class ShellTest(DeviceTest):
266    def _interactive_shell(self, shell_args, input):
267        """Runs an interactive adb shell.
268
269        Args:
270          shell_args: List of string arguments to `adb shell`.
271          input: String input to send to the interactive shell.
272
273        Returns:
274          The remote exit code.
275
276        Raises:
277          unittest.SkipTest: The device doesn't support exit codes.
278        """
279        if not self.device.has_shell_protocol():
280            raise unittest.SkipTest('exit codes are unavailable on this device')
281
282        proc = subprocess.Popen(
283                self.device.adb_cmd + ['shell'] + shell_args,
284                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
285                stderr=subprocess.PIPE)
286        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
287        # to explicitly add an exit command to close the session from the device
288        # side, plus the necessary newline to complete the interactive command.
289        proc.communicate(input + '; exit\n')
290        return proc.returncode
291
292    def test_cat(self):
293        """Check that we can at least cat a file."""
294        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
295        elements = out.split()
296        self.assertEqual(len(elements), 2)
297
298        uptime, idle = elements
299        self.assertGreater(float(uptime), 0.0)
300        self.assertGreater(float(idle), 0.0)
301
302    def test_throws_on_failure(self):
303        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
304
305    def test_output_not_stripped(self):
306        out = self.device.shell(['echo', 'foo'])[0]
307        self.assertEqual(out, 'foo' + self.device.linesep)
308
309    def test_shell_command_length(self):
310        # Devices that have shell_v2 should be able to handle long commands.
311        if self.device.has_shell_protocol():
312            rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
313            self.assertEqual(rc, 0)
314            self.assertTrue(out == ('x' * 16384 + '\n'))
315
316    def test_shell_nocheck_failure(self):
317        rc, out, _ = self.device.shell_nocheck(['false'])
318        self.assertNotEqual(rc, 0)
319        self.assertEqual(out, '')
320
321    def test_shell_nocheck_output_not_stripped(self):
322        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
323        self.assertEqual(rc, 0)
324        self.assertEqual(out, 'foo' + self.device.linesep)
325
326    def test_can_distinguish_tricky_results(self):
327        # If result checking on ADB shell is naively implemented as
328        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
329        # output from the result for a cmd of `echo -n 1`.
330        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
331        self.assertEqual(rc, 0)
332        self.assertEqual(out, '1')
333
334    def test_line_endings(self):
335        """Ensure that line ending translation is not happening in the pty.
336
337        Bug: http://b/19735063
338        """
339        output = self.device.shell(['uname'])[0]
340        self.assertEqual(output, 'Linux' + self.device.linesep)
341
342    def test_pty_logic(self):
343        """Tests that a PTY is allocated when it should be.
344
345        PTY allocation behavior should match ssh.
346        """
347        def check_pty(args):
348            """Checks adb shell PTY allocation.
349
350            Tests |args| for terminal and non-terminal stdin.
351
352            Args:
353                args: -Tt args in a list (e.g. ['-t', '-t']).
354
355            Returns:
356                A tuple (<terminal>, <non-terminal>). True indicates
357                the corresponding shell allocated a remote PTY.
358            """
359            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
360
361            terminal = subprocess.Popen(
362                    test_cmd, stdin=None,
363                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
364            terminal.communicate()
365
366            non_terminal = subprocess.Popen(
367                    test_cmd, stdin=subprocess.PIPE,
368                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
369            non_terminal.communicate()
370
371            return (terminal.returncode == 0, non_terminal.returncode == 0)
372
373        # -T: never allocate PTY.
374        self.assertEqual((False, False), check_pty(['-T']))
375
376        # These tests require a new device.
377        if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
378            # No args: PTY only if stdin is a terminal and shell is interactive,
379            # which is difficult to reliably test from a script.
380            self.assertEqual((False, False), check_pty([]))
381
382            # -t: PTY if stdin is a terminal.
383            self.assertEqual((True, False), check_pty(['-t']))
384
385        # -t -t: always allocate PTY.
386        self.assertEqual((True, True), check_pty(['-t', '-t']))
387
388        # -tt: always allocate PTY, POSIX style (http://b/32216152).
389        self.assertEqual((True, True), check_pty(['-tt']))
390
391        # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
392        # we follow the man page instead.
393        self.assertEqual((True, True), check_pty(['-ttt']))
394
395        # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
396        self.assertEqual((True, True), check_pty(['-ttx']))
397
398        # -Ttt: -tt cancels out -T.
399        self.assertEqual((True, True), check_pty(['-Ttt']))
400
401        # -ttT: -T cancels out -tt.
402        self.assertEqual((False, False), check_pty(['-ttT']))
403
404    def test_shell_protocol(self):
405        """Tests the shell protocol on the device.
406
407        If the device supports shell protocol, this gives us the ability
408        to separate stdout/stderr and return the exit code directly.
409
410        Bug: http://b/19734861
411        """
412        if not self.device.has_shell_protocol():
413            raise unittest.SkipTest('shell protocol unsupported on this device')
414
415        # Shell protocol should be used by default.
416        result = self.device.shell_nocheck(
417                shlex.split('echo foo; echo bar >&2; exit 17'))
418        self.assertEqual(17, result[0])
419        self.assertEqual('foo' + self.device.linesep, result[1])
420        self.assertEqual('bar' + self.device.linesep, result[2])
421
422        self.assertEqual(17, self._interactive_shell([], 'exit 17'))
423
424        # -x flag should disable shell protocol.
425        result = self.device.shell_nocheck(
426                shlex.split('-x echo foo; echo bar >&2; exit 17'))
427        self.assertEqual(0, result[0])
428        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
429        self.assertEqual('', result[2])
430
431        self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
432
433    def test_non_interactive_sigint(self):
434        """Tests that SIGINT in a non-interactive shell kills the process.
435
436        This requires the shell protocol in order to detect the broken
437        pipe; raw data transfer mode will only see the break once the
438        subprocess tries to read or write.
439
440        Bug: http://b/23825725
441        """
442        if not self.device.has_shell_protocol():
443            raise unittest.SkipTest('shell protocol unsupported on this device')
444
445        # Start a long-running process.
446        sleep_proc = subprocess.Popen(
447                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
448                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
449                stderr=subprocess.STDOUT)
450        remote_pid = sleep_proc.stdout.readline().strip()
451        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
452        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
453
454        # Verify that the process is running, send signal, verify it stopped.
455        self.device.shell(proc_query)
456        os.kill(sleep_proc.pid, signal.SIGINT)
457        sleep_proc.communicate()
458
459        # It can take some time for the process to receive the signal and die.
460        end_time = time.time() + 3
461        while self.device.shell_nocheck(proc_query)[0] != 1:
462            self.assertFalse(time.time() > end_time,
463                             'subprocess failed to terminate in time')
464
465    def test_non_interactive_stdin(self):
466        """Tests that non-interactive shells send stdin."""
467        if not self.device.has_shell_protocol():
468            raise unittest.SkipTest('non-interactive stdin unsupported '
469                                    'on this device')
470
471        # Test both small and large inputs.
472        small_input = 'foo'
473        large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
474                                                  string.digits))
475
476        for input in (small_input, large_input):
477            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
478                                    stdin=subprocess.PIPE,
479                                    stdout=subprocess.PIPE,
480                                    stderr=subprocess.PIPE)
481            stdout, stderr = proc.communicate(input)
482            self.assertEqual(input.splitlines(), stdout.splitlines())
483            self.assertEqual('', stderr)
484
485    def test_sighup(self):
486        """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
487        log_path = "/data/local/tmp/adb_signal_test.log"
488
489        # Clear the output file.
490        self.device.shell_nocheck(["echo", ">", log_path])
491
492        script = """
493            trap "echo SIGINT > {path}; exit 0" SIGINT
494            trap "echo SIGHUP > {path}; exit 0" SIGHUP
495            echo Waiting
496            read
497        """.format(path=log_path)
498
499        script = ";".join([x.strip() for x in script.strip().splitlines()])
500
501        process = self.device.shell_popen([script], kill_atexit=False,
502                                          stdin=subprocess.PIPE,
503                                          stdout=subprocess.PIPE)
504
505        self.assertEqual("Waiting\n", process.stdout.readline())
506        process.send_signal(signal.SIGINT)
507        process.wait()
508
509        # Waiting for the local adb to finish is insufficient, since it hangs
510        # up immediately.
511        time.sleep(1)
512
513        stdout, _ = self.device.shell(["cat", log_path])
514        self.assertEqual(stdout.strip(), "SIGHUP")
515
516    def test_exit_stress(self):
517        """Hammer `adb shell exit 42` with multiple threads."""
518        thread_count = 48
519        result = dict()
520        def hammer(thread_idx, thread_count, result):
521            success = True
522            for i in range(thread_idx, 240, thread_count):
523                ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
524                if ret != i % 256:
525                    success = False
526                    break
527            result[thread_idx] = success
528
529        threads = []
530        for i in range(thread_count):
531            thread = threading.Thread(target=hammer, args=(i, thread_count, result))
532            thread.start()
533            threads.append(thread)
534        for thread in threads:
535            thread.join()
536        for i, success in result.iteritems():
537            self.assertTrue(success)
538
539
540class ArgumentEscapingTest(DeviceTest):
541    def test_shell_escaping(self):
542        """Make sure that argument escaping is somewhat sane."""
543
544        # http://b/19734868
545        # Note that this actually matches ssh(1)'s behavior --- it's
546        # converted to `sh -c echo hello; echo world` which sh interprets
547        # as `sh -c echo` (with an argument to that shell of "hello"),
548        # and then `echo world` back in the first shell.
549        result = self.device.shell(
550            shlex.split("sh -c 'echo hello; echo world'"))[0]
551        result = result.splitlines()
552        self.assertEqual(['', 'world'], result)
553        # If you really wanted "hello" and "world", here's what you'd do:
554        result = self.device.shell(
555            shlex.split(r'echo hello\;echo world'))[0].splitlines()
556        self.assertEqual(['hello', 'world'], result)
557
558        # http://b/15479704
559        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
560        self.assertEqual('t', result)
561        result = self.device.shell(
562            shlex.split("sh -c 'true && echo t'"))[0].strip()
563        self.assertEqual('t', result)
564
565        # http://b/20564385
566        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
567        self.assertEqual('t', result)
568        result = self.device.shell(
569            shlex.split(r'echo -n 123\;uname'))[0].strip()
570        self.assertEqual('123Linux', result)
571
572    def test_install_argument_escaping(self):
573        """Make sure that install argument escaping works."""
574        # http://b/20323053, http://b/3090932.
575        for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
576            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
577                                             delete=False)
578            tf.close()
579
580            # Installing bogus .apks fails if the device supports exit codes.
581            try:
582                output = self.device.install(tf.name)
583            except subprocess.CalledProcessError as e:
584                output = e.output
585
586            self.assertIn(file_suffix, output)
587            os.remove(tf.name)
588
589
590class RootUnrootTest(DeviceTest):
591    def _test_root(self):
592        message = self.device.root()
593        if 'adbd cannot run as root in production builds' in message:
594            return
595        self.device.wait()
596        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
597
598    def _test_unroot(self):
599        self.device.unroot()
600        self.device.wait()
601        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
602
603    def test_root_unroot(self):
604        """Make sure that adb root and adb unroot work, using id(1)."""
605        if self.device.get_prop('ro.debuggable') != '1':
606            raise unittest.SkipTest('requires rootable build')
607
608        original_user = self.device.shell(['id', '-un'])[0].strip()
609        try:
610            if original_user == 'root':
611                self._test_unroot()
612                self._test_root()
613            elif original_user == 'shell':
614                self._test_root()
615                self._test_unroot()
616        finally:
617            if original_user == 'root':
618                self.device.root()
619            else:
620                self.device.unroot()
621            self.device.wait()
622
623
624class TcpIpTest(DeviceTest):
625    def test_tcpip_failure_raises(self):
626        """adb tcpip requires a port.
627
628        Bug: http://b/22636927
629        """
630        self.assertRaises(
631            subprocess.CalledProcessError, self.device.tcpip, '')
632        self.assertRaises(
633            subprocess.CalledProcessError, self.device.tcpip, 'foo')
634
635
636class SystemPropertiesTest(DeviceTest):
637    def test_get_prop(self):
638        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
639
640    @requires_root
641    def test_set_prop(self):
642        prop_name = 'foo.bar'
643        self.device.shell(['setprop', prop_name, '""'])
644
645        self.device.set_prop(prop_name, 'qux')
646        self.assertEqual(
647            self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
648
649
650def compute_md5(string):
651    hsh = hashlib.md5()
652    hsh.update(string)
653    return hsh.hexdigest()
654
655
656def get_md5_prog(device):
657    """Older platforms (pre-L) had the name md5 rather than md5sum."""
658    try:
659        device.shell(['md5sum', '/proc/uptime'])
660        return 'md5sum'
661    except adb.ShellError:
662        return 'md5'
663
664
665class HostFile(object):
666    def __init__(self, handle, checksum):
667        self.handle = handle
668        self.checksum = checksum
669        self.full_path = handle.name
670        self.base_name = os.path.basename(self.full_path)
671
672
673class DeviceFile(object):
674    def __init__(self, checksum, full_path):
675        self.checksum = checksum
676        self.full_path = full_path
677        self.base_name = posixpath.basename(self.full_path)
678
679
680def make_random_host_files(in_dir, num_files):
681    min_size = 1 * (1 << 10)
682    max_size = 16 * (1 << 10)
683
684    files = []
685    for _ in xrange(num_files):
686        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
687
688        size = random.randrange(min_size, max_size, 1024)
689        rand_str = os.urandom(size)
690        file_handle.write(rand_str)
691        file_handle.flush()
692        file_handle.close()
693
694        md5 = compute_md5(rand_str)
695        files.append(HostFile(file_handle, md5))
696    return files
697
698
699def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
700    min_size = 1 * (1 << 10)
701    max_size = 16 * (1 << 10)
702
703    files = []
704    for file_num in xrange(num_files):
705        size = random.randrange(min_size, max_size, 1024)
706
707        base_name = prefix + str(file_num)
708        full_path = posixpath.join(in_dir, base_name)
709
710        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
711                      'bs={}'.format(size), 'count=1'])
712        dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
713
714        files.append(DeviceFile(dev_md5, full_path))
715    return files
716
717
718class FileOperationsTest(DeviceTest):
719    SCRATCH_DIR = '/data/local/tmp'
720    DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
721    DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
722
723    def _verify_remote(self, checksum, remote_path):
724        dev_md5, _ = self.device.shell([get_md5_prog(self.device),
725                                        remote_path])[0].split()
726        self.assertEqual(checksum, dev_md5)
727
728    def _verify_local(self, checksum, local_path):
729        with open(local_path, 'rb') as host_file:
730            host_md5 = compute_md5(host_file.read())
731            self.assertEqual(host_md5, checksum)
732
733    def test_push(self):
734        """Push a randomly generated file to specified device."""
735        kbytes = 512
736        tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
737        rand_str = os.urandom(1024 * kbytes)
738        tmp.write(rand_str)
739        tmp.close()
740
741        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
742        self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
743
744        self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
745        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
746
747        os.remove(tmp.name)
748
749    def test_push_dir(self):
750        """Push a randomly generated directory of files to the device."""
751        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
752        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
753
754        try:
755            host_dir = tempfile.mkdtemp()
756
757            # Make sure the temp directory isn't setuid, or else adb will complain.
758            os.chmod(host_dir, 0o700)
759
760            # Create 32 random files.
761            temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
762            self.device.push(host_dir, self.DEVICE_TEMP_DIR)
763
764            for temp_file in temp_files:
765                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
766                                             os.path.basename(host_dir),
767                                             temp_file.base_name)
768                self._verify_remote(temp_file.checksum, remote_path)
769            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
770        finally:
771            if host_dir is not None:
772                shutil.rmtree(host_dir)
773
774    def disabled_test_push_empty(self):
775        """Push an empty directory to the device."""
776        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
777        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
778
779        try:
780            host_dir = tempfile.mkdtemp()
781
782            # Make sure the temp directory isn't setuid, or else adb will complain.
783            os.chmod(host_dir, 0o700)
784
785            # Create an empty directory.
786            empty_dir_path = os.path.join(host_dir, 'empty')
787            os.mkdir(empty_dir_path);
788
789            self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
790
791            remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
792            test_empty_cmd = ["[", "-d", remote_path, "]"]
793            rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
794
795            self.assertEqual(rc, 0)
796            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
797        finally:
798            if host_dir is not None:
799                shutil.rmtree(host_dir)
800
801    @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
802    def test_push_symlink(self):
803        """Push a symlink.
804
805        Bug: http://b/31491920
806        """
807        try:
808            host_dir = tempfile.mkdtemp()
809
810            # Make sure the temp directory isn't setuid, or else adb will
811            # complain.
812            os.chmod(host_dir, 0o700)
813
814            with open(os.path.join(host_dir, 'foo'), 'w') as f:
815                f.write('foo')
816
817            symlink_path = os.path.join(host_dir, 'symlink')
818            os.symlink('foo', symlink_path)
819
820            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
821            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
822            self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
823            rc, out, _ = self.device.shell_nocheck(
824                ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
825            self.assertEqual(0, rc)
826            self.assertEqual(out.strip(), 'foo')
827        finally:
828            if host_dir is not None:
829                shutil.rmtree(host_dir)
830
831    def test_multiple_push(self):
832        """Push multiple files to the device in one adb push command.
833
834        Bug: http://b/25324823
835        """
836
837        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
838        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
839
840        try:
841            host_dir = tempfile.mkdtemp()
842
843            # Create some random files and a subdirectory containing more files.
844            temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
845
846            subdir = os.path.join(host_dir, 'subdir')
847            os.mkdir(subdir)
848            subdir_temp_files = make_random_host_files(in_dir=subdir,
849                                                       num_files=4)
850
851            paths = map(lambda temp_file: temp_file.full_path, temp_files)
852            paths.append(subdir)
853            self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
854
855            for temp_file in temp_files:
856                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
857                                             temp_file.base_name)
858                self._verify_remote(temp_file.checksum, remote_path)
859
860            for subdir_temp_file in subdir_temp_files:
861                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
862                                             # BROKEN: http://b/25394682
863                                             # 'subdir';
864                                             temp_file.base_name)
865                self._verify_remote(temp_file.checksum, remote_path)
866
867
868            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
869        finally:
870            if host_dir is not None:
871                shutil.rmtree(host_dir)
872
873    @requires_non_root
874    def test_push_error_reporting(self):
875        """Make sure that errors that occur while pushing a file get reported
876
877        Bug: http://b/26816782
878        """
879        with tempfile.NamedTemporaryFile() as tmp_file:
880            tmp_file.write('\0' * 1024 * 1024)
881            tmp_file.flush()
882            try:
883                self.device.push(local=tmp_file.name, remote='/system/')
884                self.fail('push should not have succeeded')
885            except subprocess.CalledProcessError as e:
886                output = e.output
887
888            self.assertTrue('Permission denied' in output or
889                            'Read-only file system' in output)
890
891    @requires_non_root
892    def test_push_directory_creation(self):
893        """Regression test for directory creation.
894
895        Bug: http://b/110953234
896        """
897        with tempfile.NamedTemporaryFile() as tmp_file:
898            tmp_file.write('\0' * 1024 * 1024)
899            tmp_file.flush()
900            remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
901            self.device.shell(['rm', '-rf', remote_path])
902
903            remote_path += '/filename'
904            self.device.push(local=tmp_file.name, remote=remote_path)
905
906    def disabled_test_push_multiple_slash_root(self):
907        """Regression test for pushing to //data/local/tmp.
908
909        Bug: http://b/141311284
910
911        Disabled because this broken on the adbd side as well: b/141943968
912        """
913        with tempfile.NamedTemporaryFile() as tmp_file:
914            tmp_file.write('\0' * 1024 * 1024)
915            tmp_file.flush()
916            remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root'
917            self.device.shell(['rm', '-rf', remote_path])
918            self.device.push(local=tmp_file.name, remote=remote_path)
919
920    def _test_pull(self, remote_file, checksum):
921        tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
922        tmp_write.close()
923        self.device.pull(remote=remote_file, local=tmp_write.name)
924        with open(tmp_write.name, 'rb') as tmp_read:
925            host_contents = tmp_read.read()
926            host_md5 = compute_md5(host_contents)
927        self.assertEqual(checksum, host_md5)
928        os.remove(tmp_write.name)
929
930    @requires_non_root
931    def test_pull_error_reporting(self):
932        self.device.shell(['touch', self.DEVICE_TEMP_FILE])
933        self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
934
935        try:
936            output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
937        except subprocess.CalledProcessError as e:
938            output = e.output
939
940        self.assertIn('Permission denied', output)
941
942        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
943
944    def test_pull(self):
945        """Pull a randomly generated file from specified device."""
946        kbytes = 512
947        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
948        cmd = ['dd', 'if=/dev/urandom',
949               'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
950               'count={}'.format(kbytes)]
951        self.device.shell(cmd)
952        dev_md5, _ = self.device.shell(
953            [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
954        self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
955        self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
956
957    def test_pull_dir(self):
958        """Pull a randomly generated directory of files from the device."""
959        try:
960            host_dir = tempfile.mkdtemp()
961
962            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
963            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
964
965            # Populate device directory with random files.
966            temp_files = make_random_device_files(
967                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
968
969            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
970
971            for temp_file in temp_files:
972                host_path = os.path.join(
973                    host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
974                    temp_file.base_name)
975                self._verify_local(temp_file.checksum, host_path)
976
977            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
978        finally:
979            if host_dir is not None:
980                shutil.rmtree(host_dir)
981
982    def test_pull_dir_symlink(self):
983        """Pull a directory into a symlink to a directory.
984
985        Bug: http://b/27362811
986        """
987        if os.name != 'posix':
988            raise unittest.SkipTest('requires POSIX')
989
990        try:
991            host_dir = tempfile.mkdtemp()
992            real_dir = os.path.join(host_dir, 'dir')
993            symlink = os.path.join(host_dir, 'symlink')
994            os.mkdir(real_dir)
995            os.symlink(real_dir, symlink)
996
997            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
998            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
999
1000            # Populate device directory with random files.
1001            temp_files = make_random_device_files(
1002                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1003
1004            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
1005
1006            for temp_file in temp_files:
1007                host_path = os.path.join(
1008                    real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1009                    temp_file.base_name)
1010                self._verify_local(temp_file.checksum, host_path)
1011
1012            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1013        finally:
1014            if host_dir is not None:
1015                shutil.rmtree(host_dir)
1016
1017    def test_pull_dir_symlink_collision(self):
1018        """Pull a directory into a colliding symlink to directory."""
1019        if os.name != 'posix':
1020            raise unittest.SkipTest('requires POSIX')
1021
1022        try:
1023            host_dir = tempfile.mkdtemp()
1024            real_dir = os.path.join(host_dir, 'real')
1025            tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1026            symlink = os.path.join(host_dir, tmp_dirname)
1027            os.mkdir(real_dir)
1028            os.symlink(real_dir, symlink)
1029
1030            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1031            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1032
1033            # Populate device directory with random files.
1034            temp_files = make_random_device_files(
1035                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1036
1037            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1038
1039            for temp_file in temp_files:
1040                host_path = os.path.join(real_dir, temp_file.base_name)
1041                self._verify_local(temp_file.checksum, host_path)
1042
1043            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1044        finally:
1045            if host_dir is not None:
1046                shutil.rmtree(host_dir)
1047
1048    def test_pull_dir_nonexistent(self):
1049        """Pull a directory of files from the device to a nonexistent path."""
1050        try:
1051            host_dir = tempfile.mkdtemp()
1052            dest_dir = os.path.join(host_dir, 'dest')
1053
1054            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1055            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1056
1057            # Populate device directory with random files.
1058            temp_files = make_random_device_files(
1059                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1060
1061            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1062
1063            for temp_file in temp_files:
1064                host_path = os.path.join(dest_dir, temp_file.base_name)
1065                self._verify_local(temp_file.checksum, host_path)
1066
1067            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1068        finally:
1069            if host_dir is not None:
1070                shutil.rmtree(host_dir)
1071
1072    # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1073    def disabled_test_pull_symlink_dir(self):
1074        """Pull a symlink to a directory of symlinks to files."""
1075        try:
1076            host_dir = tempfile.mkdtemp()
1077
1078            remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1079            remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1080            remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1081
1082            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1083            self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1084            self.device.shell(['ln', '-s', remote_links, remote_symlink])
1085
1086            # Populate device directory with random files.
1087            temp_files = make_random_device_files(
1088                self.device, in_dir=remote_dir, num_files=32)
1089
1090            for temp_file in temp_files:
1091                self.device.shell(
1092                    ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1093                     posixpath.join(remote_links, temp_file.base_name)])
1094
1095            self.device.pull(remote=remote_symlink, local=host_dir)
1096
1097            for temp_file in temp_files:
1098                host_path = os.path.join(
1099                    host_dir, 'symlink', temp_file.base_name)
1100                self._verify_local(temp_file.checksum, host_path)
1101
1102            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1103        finally:
1104            if host_dir is not None:
1105                shutil.rmtree(host_dir)
1106
1107    def test_pull_empty(self):
1108        """Pull a directory containing an empty directory from the device."""
1109        try:
1110            host_dir = tempfile.mkdtemp()
1111
1112            remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1113            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1114            self.device.shell(['mkdir', '-p', remote_empty_path])
1115
1116            self.device.pull(remote=remote_empty_path, local=host_dir)
1117            self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1118        finally:
1119            if host_dir is not None:
1120                shutil.rmtree(host_dir)
1121
1122    def test_multiple_pull(self):
1123        """Pull a randomly generated directory of files from the device."""
1124
1125        try:
1126            host_dir = tempfile.mkdtemp()
1127
1128            subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1129            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1130            self.device.shell(['mkdir', '-p', subdir])
1131
1132            # Create some random files and a subdirectory containing more files.
1133            temp_files = make_random_device_files(
1134                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1135
1136            subdir_temp_files = make_random_device_files(
1137                self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1138
1139            paths = map(lambda temp_file: temp_file.full_path, temp_files)
1140            paths.append(subdir)
1141            self.device._simple_call(['pull'] + paths + [host_dir])
1142
1143            for temp_file in temp_files:
1144                local_path = os.path.join(host_dir, temp_file.base_name)
1145                self._verify_local(temp_file.checksum, local_path)
1146
1147            for subdir_temp_file in subdir_temp_files:
1148                local_path = os.path.join(host_dir,
1149                                          'subdir',
1150                                          subdir_temp_file.base_name)
1151                self._verify_local(subdir_temp_file.checksum, local_path)
1152
1153            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1154        finally:
1155            if host_dir is not None:
1156                shutil.rmtree(host_dir)
1157
1158    def verify_sync(self, device, temp_files, device_dir):
1159        """Verifies that a list of temp files was synced to the device."""
1160        # Confirm that every file on the device mirrors that on the host.
1161        for temp_file in temp_files:
1162            device_full_path = posixpath.join(
1163                device_dir, temp_file.base_name)
1164            dev_md5, _ = device.shell(
1165                [get_md5_prog(self.device), device_full_path])[0].split()
1166            self.assertEqual(temp_file.checksum, dev_md5)
1167
1168    def test_sync(self):
1169        """Sync a host directory to the data partition."""
1170
1171        try:
1172            base_dir = tempfile.mkdtemp()
1173
1174            # Create mirror device directory hierarchy within base_dir.
1175            full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1176            os.makedirs(full_dir_path)
1177
1178            # Create 32 random files within the host mirror.
1179            temp_files = make_random_host_files(
1180                in_dir=full_dir_path, num_files=32)
1181
1182            # Clean up any stale files on the device.
1183            device = adb.get_device()  # pylint: disable=no-member
1184            device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1185
1186            old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1187            os.environ['ANDROID_PRODUCT_OUT'] = base_dir
1188            device.sync('data')
1189            if old_product_out is None:
1190                del os.environ['ANDROID_PRODUCT_OUT']
1191            else:
1192                os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
1193
1194            self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
1195
1196            #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1197        finally:
1198            if base_dir is not None:
1199                shutil.rmtree(base_dir)
1200
1201    def test_push_sync(self):
1202        """Sync a host directory to a specific path."""
1203
1204        try:
1205            temp_dir = tempfile.mkdtemp()
1206            temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1207
1208            device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1209
1210            # Clean up any stale files on the device.
1211            device = adb.get_device()  # pylint: disable=no-member
1212            device.shell(['rm', '-rf', device_dir])
1213
1214            device.push(temp_dir, device_dir, sync=True)
1215
1216            self.verify_sync(device, temp_files, device_dir)
1217
1218            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1219        finally:
1220            if temp_dir is not None:
1221                shutil.rmtree(temp_dir)
1222
1223    def test_unicode_paths(self):
1224        """Ensure that we can support non-ASCII paths, even on Windows."""
1225        name = u'로보카 폴리'
1226
1227        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1228        remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1229
1230        ## push.
1231        tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1232        tf.close()
1233        self.device.push(tf.name, remote_path)
1234        os.remove(tf.name)
1235        self.assertFalse(os.path.exists(tf.name))
1236
1237        # Verify that the device ended up with the expected UTF-8 path
1238        output = self.device.shell(
1239                ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1240        self.assertEqual(remote_path, output)
1241
1242        # pull.
1243        self.device.pull(remote_path, tf.name)
1244        self.assertTrue(os.path.exists(tf.name))
1245        os.remove(tf.name)
1246        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1247
1248
1249class DeviceOfflineTest(DeviceTest):
1250    def _get_device_state(self, serialno):
1251        output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1252        for line in output.split('\n'):
1253            m = re.match('(\S+)\s+(\S+)', line)
1254            if m and m.group(1) == serialno:
1255                return m.group(2)
1256        return None
1257
1258    def disabled_test_killed_when_pushing_a_large_file(self):
1259        """
1260           While running adb push with a large file, kill adb server.
1261           Occasionally the device becomes offline. Because the device is still
1262           reading data without realizing that the adb server has been restarted.
1263           Test if we can bring the device online automatically now.
1264           http://b/32952319
1265        """
1266        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1267        # 1. Push a large file
1268        file_path = 'tmp_large_file'
1269        try:
1270            fh = open(file_path, 'w')
1271            fh.write('\0' * (100 * 1024 * 1024))
1272            fh.close()
1273            subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1274            time.sleep(0.1)
1275            # 2. Kill the adb server
1276            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1277            subproc.terminate()
1278        finally:
1279            try:
1280                os.unlink(file_path)
1281            except:
1282                pass
1283        # 3. See if the device still exist.
1284        # Sleep to wait for the adb server exit.
1285        time.sleep(0.5)
1286        # 4. The device should be online
1287        self.assertEqual(self._get_device_state(serialno), 'device')
1288
1289    def disabled_test_killed_when_pulling_a_large_file(self):
1290        """
1291           While running adb pull with a large file, kill adb server.
1292           Occasionally the device can't be connected. Because the device is trying to
1293           send a message larger than what is expected by the adb server.
1294           Test if we can bring the device online automatically now.
1295        """
1296        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1297        file_path = 'tmp_large_file'
1298        try:
1299            # 1. Create a large file on device.
1300            self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1301                               'bs=1000000', 'count=100'])
1302            # 2. Pull the large file on host.
1303            subproc = subprocess.Popen(self.device.adb_cmd +
1304                                       ['pull','/data/local/tmp/tmp_large_file', file_path])
1305            time.sleep(0.1)
1306            # 3. Kill the adb server
1307            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1308            subproc.terminate()
1309        finally:
1310            try:
1311                os.unlink(file_path)
1312            except:
1313                pass
1314        # 4. See if the device still exist.
1315        # Sleep to wait for the adb server exit.
1316        time.sleep(0.5)
1317        self.assertEqual(self._get_device_state(serialno), 'device')
1318
1319
1320    def test_packet_size_regression(self):
1321        """Test for http://b/37783561
1322
1323        Receiving packets of a length divisible by 512 but not 1024 resulted in
1324        the adb client waiting indefinitely for more input.
1325        """
1326        # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1327        # Probe some surrounding values as well, for the hell of it.
1328        for base in [512] + range(1024, 1024 * 16, 1024):
1329            for offset in [-6, -5, -4]:
1330                length = base + offset
1331                cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1332                       'echo', 'foo']
1333                rc, stdout, _ = self.device.shell_nocheck(cmd)
1334
1335                self.assertEqual(0, rc)
1336
1337                # Output should be '\0' * length, followed by "foo\n"
1338                self.assertEqual(length, len(stdout) - 4)
1339                self.assertEqual(stdout, "\0" * length + "foo\n")
1340
1341    def test_zero_packet(self):
1342        """Test for http://b/113070258
1343
1344        Make sure that we don't blow up when sending USB transfers that line up
1345        exactly with the USB packet size.
1346        """
1347
1348        local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1349        try:
1350            for size in [512, 1024]:
1351                def listener():
1352                    cmd = ["echo foo | nc -l -p 12345; echo done"]
1353                    rc, stdout, stderr = self.device.shell_nocheck(cmd)
1354
1355                thread = threading.Thread(target=listener)
1356                thread.start()
1357
1358                # Wait a bit to let the shell command start.
1359                time.sleep(0.25)
1360
1361                sock = socket.create_connection(("localhost", local_port))
1362                with contextlib.closing(sock):
1363                    bytesWritten = sock.send("a" * size)
1364                    self.assertEqual(size, bytesWritten)
1365                    readBytes = sock.recv(4096)
1366                    self.assertEqual("foo\n", readBytes)
1367
1368                thread.join()
1369        finally:
1370            self.device.forward_remove("tcp:{}".format(local_port))
1371
1372
1373class SocketTest(DeviceTest):
1374    def test_socket_flush(self):
1375        """Test that we handle socket closure properly.
1376
1377        If we're done writing to a socket, closing before the other end has
1378        closed will send a TCP_RST if we have incoming data queued up, which
1379        may result in data that we've written being discarded.
1380
1381        Bug: http://b/74616284
1382        """
1383        s = socket.create_connection(("localhost", 5037))
1384
1385        def adb_length_prefixed(string):
1386            encoded = string.encode("utf8")
1387            result = b"%04x%s" % (len(encoded), encoded)
1388            return result
1389
1390        if "ANDROID_SERIAL" in os.environ:
1391            transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1392        else:
1393            transport_string = "host:transport-any"
1394
1395        s.sendall(adb_length_prefixed(transport_string))
1396        response = s.recv(4)
1397        self.assertEquals(b"OKAY", response)
1398
1399        shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1400        s.sendall(adb_length_prefixed(shell_string))
1401
1402        response = s.recv(4)
1403        self.assertEquals(b"OKAY", response)
1404
1405        # Spawn a thread that dumps garbage into the socket until failure.
1406        def spam():
1407            buf = b"\0" * 16384
1408            try:
1409                while True:
1410                    s.sendall(buf)
1411            except Exception as ex:
1412                print(ex)
1413
1414        thread = threading.Thread(target=spam)
1415        thread.start()
1416
1417        time.sleep(1)
1418
1419        received = b""
1420        while True:
1421            read = s.recv(512)
1422            if len(read) == 0:
1423                break
1424            received += read
1425
1426        self.assertEquals(1024 * 1024 + len("foo\n"), len(received))
1427        thread.join()
1428
1429
1430if sys.platform == "win32":
1431    # From https://stackoverflow.com/a/38749458
1432    import os
1433    import contextlib
1434    import msvcrt
1435    import ctypes
1436    from ctypes import wintypes
1437
1438    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1439
1440    GENERIC_READ  = 0x80000000
1441    GENERIC_WRITE = 0x40000000
1442    FILE_SHARE_READ  = 1
1443    FILE_SHARE_WRITE = 2
1444    CONSOLE_TEXTMODE_BUFFER = 1
1445    INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1446    STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1447    STD_ERROR_HANDLE = wintypes.DWORD(-12)
1448
1449    def _check_zero(result, func, args):
1450        if not result:
1451            raise ctypes.WinError(ctypes.get_last_error())
1452        return args
1453
1454    def _check_invalid(result, func, args):
1455        if result == INVALID_HANDLE_VALUE:
1456            raise ctypes.WinError(ctypes.get_last_error())
1457        return args
1458
1459    if not hasattr(wintypes, 'LPDWORD'): # Python 2
1460        wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1461        wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1462
1463    class COORD(ctypes.Structure):
1464        _fields_ = (('X', wintypes.SHORT),
1465                    ('Y', wintypes.SHORT))
1466
1467    class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1468        _fields_ = (('cbSize',               wintypes.ULONG),
1469                    ('dwSize',               COORD),
1470                    ('dwCursorPosition',     COORD),
1471                    ('wAttributes',          wintypes.WORD),
1472                    ('srWindow',             wintypes.SMALL_RECT),
1473                    ('dwMaximumWindowSize',  COORD),
1474                    ('wPopupAttributes',     wintypes.WORD),
1475                    ('bFullscreenSupported', wintypes.BOOL),
1476                    ('ColorTable',           wintypes.DWORD * 16))
1477        def __init__(self, *args, **kwds):
1478            super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1479                    *args, **kwds)
1480            self.cbSize = ctypes.sizeof(self)
1481
1482    PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1483                                        CONSOLE_SCREEN_BUFFER_INFOEX)
1484    LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1485
1486    kernel32.GetStdHandle.errcheck = _check_invalid
1487    kernel32.GetStdHandle.restype = wintypes.HANDLE
1488    kernel32.GetStdHandle.argtypes = (
1489        wintypes.DWORD,) # _In_ nStdHandle
1490
1491    kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1492    kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1493    kernel32.CreateConsoleScreenBuffer.argtypes = (
1494        wintypes.DWORD,        # _In_       dwDesiredAccess
1495        wintypes.DWORD,        # _In_       dwShareMode
1496        LPSECURITY_ATTRIBUTES, # _In_opt_   lpSecurityAttributes
1497        wintypes.DWORD,        # _In_       dwFlags
1498        wintypes.LPVOID)       # _Reserved_ lpScreenBufferData
1499
1500    kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1501    kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1502        wintypes.HANDLE,               # _In_  hConsoleOutput
1503        PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1504
1505    kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1506    kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1507        wintypes.HANDLE,               # _In_  hConsoleOutput
1508        PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_  lpConsoleScreenBufferInfo
1509
1510    kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1511    kernel32.SetConsoleWindowInfo.argtypes = (
1512        wintypes.HANDLE,      # _In_ hConsoleOutput
1513        wintypes.BOOL,        # _In_ bAbsolute
1514        wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1515
1516    kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1517    kernel32.FillConsoleOutputCharacterW.argtypes = (
1518        wintypes.HANDLE,  # _In_  hConsoleOutput
1519        wintypes.WCHAR,   # _In_  cCharacter
1520        wintypes.DWORD,   # _In_  nLength
1521        COORD,            # _In_  dwWriteCoord
1522        wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1523
1524    kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1525    kernel32.ReadConsoleOutputCharacterW.argtypes = (
1526        wintypes.HANDLE,  # _In_  hConsoleOutput
1527        wintypes.LPWSTR,  # _Out_ lpCharacter
1528        wintypes.DWORD,   # _In_  nLength
1529        COORD,            # _In_  dwReadCoord
1530        wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1531
1532    @contextlib.contextmanager
1533    def allocate_console():
1534        allocated = kernel32.AllocConsole()
1535        try:
1536            yield allocated
1537        finally:
1538            if allocated:
1539                kernel32.FreeConsole()
1540
1541    @contextlib.contextmanager
1542    def console_screen(ncols=None, nrows=None):
1543        info = CONSOLE_SCREEN_BUFFER_INFOEX()
1544        new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1545        nwritten = (wintypes.DWORD * 1)()
1546        hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1547        kernel32.GetConsoleScreenBufferInfoEx(
1548               hStdOut, ctypes.byref(info))
1549        if ncols is None:
1550            ncols = info.dwSize.X
1551        if nrows is None:
1552            nrows = info.dwSize.Y
1553        elif nrows > 9999:
1554            raise ValueError('nrows must be 9999 or less')
1555        fd_screen = None
1556        hScreen = kernel32.CreateConsoleScreenBuffer(
1557                    GENERIC_READ | GENERIC_WRITE,
1558                    FILE_SHARE_READ | FILE_SHARE_WRITE,
1559                    None, CONSOLE_TEXTMODE_BUFFER, None)
1560        try:
1561            fd_screen = msvcrt.open_osfhandle(
1562                            hScreen, os.O_RDWR | os.O_BINARY)
1563            kernel32.GetConsoleScreenBufferInfoEx(
1564                   hScreen, ctypes.byref(new_info))
1565            new_info.dwSize = COORD(ncols, nrows)
1566            new_info.srWindow = wintypes.SMALL_RECT(
1567                    Left=0, Top=0, Right=(ncols - 1),
1568                    Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1569            kernel32.SetConsoleScreenBufferInfoEx(
1570                    hScreen, ctypes.byref(new_info))
1571            kernel32.SetConsoleWindowInfo(hScreen, True,
1572                    ctypes.byref(new_info.srWindow))
1573            kernel32.FillConsoleOutputCharacterW(
1574                    hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1575            kernel32.SetConsoleActiveScreenBuffer(hScreen)
1576            try:
1577                yield fd_screen
1578            finally:
1579                kernel32.SetConsoleScreenBufferInfoEx(
1580                    hStdOut, ctypes.byref(info))
1581                kernel32.SetConsoleWindowInfo(hStdOut, True,
1582                        ctypes.byref(info.srWindow))
1583                kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1584        finally:
1585            if fd_screen is not None:
1586                os.close(fd_screen)
1587            else:
1588                kernel32.CloseHandle(hScreen)
1589
1590    def read_screen(fd):
1591        hScreen = msvcrt.get_osfhandle(fd)
1592        csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1593        kernel32.GetConsoleScreenBufferInfoEx(
1594            hScreen, ctypes.byref(csbi))
1595        ncols = csbi.dwSize.X
1596        pos = csbi.dwCursorPosition
1597        length = ncols * pos.Y + pos.X + 1
1598        buf = (ctypes.c_wchar * length)()
1599        n = (wintypes.DWORD * 1)()
1600        kernel32.ReadConsoleOutputCharacterW(
1601            hScreen, buf, length, COORD(0,0), n)
1602        lines = [buf[i:i+ncols].rstrip(u'\0')
1603                    for i in range(0, n[0], ncols)]
1604        return u'\n'.join(lines)
1605
1606@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1607class WindowsConsoleTest(DeviceTest):
1608    def test_unicode_output(self):
1609        """Test Unicode command line parameters and Unicode console window output.
1610
1611        Bug: https://issuetracker.google.com/issues/111972753
1612        """
1613        # If we don't have a console window, allocate one. This isn't necessary if we're already
1614        # being run from a console window, which is typical.
1615        with allocate_console() as allocated_console:
1616            # Create a temporary console buffer and switch to it. We could also pass a parameter of
1617            # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1618            # likely unnecessary given the typical console window size.
1619            with console_screen(nrows=1000) as screen:
1620                unicode_string = u'로보카 폴리'
1621                # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1622                # device.shell_popen() which does not use a pipe, unlike device.shell().
1623                process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1624                process.wait()
1625                # Read what was written by adb to the temporary console buffer.
1626                console_output = read_screen(screen)
1627                self.assertEqual(unicode_string, console_output)
1628
1629
1630def main():
1631    random.seed(0)
1632    if len(adb.get_devices()) > 0:
1633        suite = unittest.TestLoader().loadTestsFromName(__name__)
1634        unittest.TextTestRunner(verbosity=3).run(suite)
1635    else:
1636        print('Test suite must be run with attached devices')
1637
1638
1639if __name__ == '__main__':
1640    main()
1641