1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import threading
35import time
36import unittest
37
38import adb
39
40def requires_root(func):
41    def wrapper(self, *args):
42        if self.device.get_prop('ro.debuggable') != '1':
43            raise unittest.SkipTest('requires rootable build')
44
45        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
46        if not was_root:
47            self.device.root()
48            self.device.wait()
49
50        try:
51            func(self, *args)
52        finally:
53            if not was_root:
54                self.device.unroot()
55                self.device.wait()
56
57    return wrapper
58
59
60def requires_non_root(func):
61    def wrapper(self, *args):
62        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
63        if was_root:
64            self.device.unroot()
65            self.device.wait()
66
67        try:
68            func(self, *args)
69        finally:
70            if was_root:
71                self.device.root()
72                self.device.wait()
73
74    return wrapper
75
76
77class DeviceTest(unittest.TestCase):
78    def setUp(self):
79        self.device = adb.get_device()
80
81
82class ForwardReverseTest(DeviceTest):
83    def _test_no_rebind(self, description, direction_list, direction,
84                       direction_no_rebind, direction_remove_all):
85        msg = direction_list()
86        self.assertEqual('', msg.strip(),
87                         description + ' list must be empty to run this test.')
88
89        # Use --no-rebind with no existing binding
90        direction_no_rebind('tcp:5566', 'tcp:6655')
91        msg = direction_list()
92        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
93
94        # Use --no-rebind with existing binding
95        with self.assertRaises(subprocess.CalledProcessError):
96            direction_no_rebind('tcp:5566', 'tcp:6677')
97        msg = direction_list()
98        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
99        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
100
101        # Use the absence of --no-rebind with existing binding
102        direction('tcp:5566', 'tcp:6677')
103        msg = direction_list()
104        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
105        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
106
107        direction_remove_all()
108        msg = direction_list()
109        self.assertEqual('', msg.strip())
110
111    def test_forward_no_rebind(self):
112        self._test_no_rebind('forward', self.device.forward_list,
113                            self.device.forward, self.device.forward_no_rebind,
114                            self.device.forward_remove_all)
115
116    def test_reverse_no_rebind(self):
117        self._test_no_rebind('reverse', self.device.reverse_list,
118                            self.device.reverse, self.device.reverse_no_rebind,
119                            self.device.reverse_remove_all)
120
121    def test_forward(self):
122        msg = self.device.forward_list()
123        self.assertEqual('', msg.strip(),
124                         'Forwarding list must be empty to run this test.')
125        self.device.forward('tcp:5566', 'tcp:6655')
126        msg = self.device.forward_list()
127        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
128        self.device.forward('tcp:7788', 'tcp:8877')
129        msg = self.device.forward_list()
130        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
131        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
132        self.device.forward_remove('tcp:5566')
133        msg = self.device.forward_list()
134        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
135        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
136        self.device.forward_remove_all()
137        msg = self.device.forward_list()
138        self.assertEqual('', msg.strip())
139
140    def test_forward_tcp_port_0(self):
141        self.assertEqual('', self.device.forward_list().strip(),
142                         'Forwarding list must be empty to run this test.')
143
144        try:
145            # If resolving TCP port 0 is supported, `adb forward` will print
146            # the actual port number.
147            port = self.device.forward('tcp:0', 'tcp:8888').strip()
148            if not port:
149                raise unittest.SkipTest('Forwarding tcp:0 is not available.')
150
151            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
152                                      self.device.forward_list()))
153        finally:
154            self.device.forward_remove_all()
155
156    def test_reverse(self):
157        msg = self.device.reverse_list()
158        self.assertEqual('', msg.strip(),
159                         'Reverse forwarding list must be empty to run this test.')
160        self.device.reverse('tcp:5566', 'tcp:6655')
161        msg = self.device.reverse_list()
162        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
163        self.device.reverse('tcp:7788', 'tcp:8877')
164        msg = self.device.reverse_list()
165        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
166        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
167        self.device.reverse_remove('tcp:5566')
168        msg = self.device.reverse_list()
169        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
170        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
171        self.device.reverse_remove_all()
172        msg = self.device.reverse_list()
173        self.assertEqual('', msg.strip())
174
175    def test_reverse_tcp_port_0(self):
176        self.assertEqual('', self.device.reverse_list().strip(),
177                         'Reverse list must be empty to run this test.')
178
179        try:
180            # If resolving TCP port 0 is supported, `adb reverse` will print
181            # the actual port number.
182            port = self.device.reverse('tcp:0', 'tcp:8888').strip()
183            if not port:
184                raise unittest.SkipTest('Reversing tcp:0 is not available.')
185
186            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
187                                      self.device.reverse_list()))
188        finally:
189            self.device.reverse_remove_all()
190
191    # Note: If you run this test when adb connect'd to a physical device over
192    # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821
193    def test_forward_reverse_echo(self):
194        """Send data through adb forward and read it back via adb reverse"""
195        forward_port = 12345
196        reverse_port = forward_port + 1
197        forward_spec = 'tcp:' + str(forward_port)
198        reverse_spec = 'tcp:' + str(reverse_port)
199        forward_setup = False
200        reverse_setup = False
201
202        try:
203            # listen on localhost:forward_port, connect to remote:forward_port
204            self.device.forward(forward_spec, forward_spec)
205            forward_setup = True
206            # listen on remote:forward_port, connect to localhost:reverse_port
207            self.device.reverse(forward_spec, reverse_spec)
208            reverse_setup = True
209
210            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
211            with contextlib.closing(listener):
212                # Use SO_REUSEADDR so that subsequent runs of the test can grab
213                # the port even if it is in TIME_WAIT.
214                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
215
216                # Listen on localhost:reverse_port before connecting to
217                # localhost:forward_port because that will cause adb to connect
218                # back to localhost:reverse_port.
219                listener.bind(('127.0.0.1', reverse_port))
220                listener.listen(4)
221
222                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
223                with contextlib.closing(client):
224                    # Connect to the listener.
225                    client.connect(('127.0.0.1', forward_port))
226
227                    # Accept the client connection.
228                    accepted_connection, addr = listener.accept()
229                    with contextlib.closing(accepted_connection) as server:
230                        data = 'hello'
231
232                        # Send data into the port setup by adb forward.
233                        client.sendall(data)
234                        # Explicitly close() so that server gets EOF.
235                        client.close()
236
237                        # Verify that the data came back via adb reverse.
238                        self.assertEqual(data, server.makefile().read())
239        finally:
240            if reverse_setup:
241                self.device.reverse_remove(forward_spec)
242            if forward_setup:
243                self.device.forward_remove(forward_spec)
244
245
246class ShellTest(DeviceTest):
247    def _interactive_shell(self, shell_args, input):
248        """Runs an interactive adb shell.
249
250        Args:
251          shell_args: List of string arguments to `adb shell`.
252          input: String input to send to the interactive shell.
253
254        Returns:
255          The remote exit code.
256
257        Raises:
258          unittest.SkipTest: The device doesn't support exit codes.
259        """
260        if not self.device.has_shell_protocol():
261            raise unittest.SkipTest('exit codes are unavailable on this device')
262
263        proc = subprocess.Popen(
264                self.device.adb_cmd + ['shell'] + shell_args,
265                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
266                stderr=subprocess.PIPE)
267        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
268        # to explicitly add an exit command to close the session from the device
269        # side, plus the necessary newline to complete the interactive command.
270        proc.communicate(input + '; exit\n')
271        return proc.returncode
272
273    def test_cat(self):
274        """Check that we can at least cat a file."""
275        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
276        elements = out.split()
277        self.assertEqual(len(elements), 2)
278
279        uptime, idle = elements
280        self.assertGreater(float(uptime), 0.0)
281        self.assertGreater(float(idle), 0.0)
282
283    def test_throws_on_failure(self):
284        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
285
286    def test_output_not_stripped(self):
287        out = self.device.shell(['echo', 'foo'])[0]
288        self.assertEqual(out, 'foo' + self.device.linesep)
289
290    def test_shell_command_length(self):
291        # Devices that have shell_v2 should be able to handle long commands.
292        if self.device.has_shell_protocol():
293            rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
294            self.assertEqual(rc, 0)
295            self.assertTrue(out == ('x' * 16384 + '\n'))
296
297    def test_shell_nocheck_failure(self):
298        rc, out, _ = self.device.shell_nocheck(['false'])
299        self.assertNotEqual(rc, 0)
300        self.assertEqual(out, '')
301
302    def test_shell_nocheck_output_not_stripped(self):
303        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
304        self.assertEqual(rc, 0)
305        self.assertEqual(out, 'foo' + self.device.linesep)
306
307    def test_can_distinguish_tricky_results(self):
308        # If result checking on ADB shell is naively implemented as
309        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
310        # output from the result for a cmd of `echo -n 1`.
311        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
312        self.assertEqual(rc, 0)
313        self.assertEqual(out, '1')
314
315    def test_line_endings(self):
316        """Ensure that line ending translation is not happening in the pty.
317
318        Bug: http://b/19735063
319        """
320        output = self.device.shell(['uname'])[0]
321        self.assertEqual(output, 'Linux' + self.device.linesep)
322
323    def test_pty_logic(self):
324        """Tests that a PTY is allocated when it should be.
325
326        PTY allocation behavior should match ssh.
327        """
328        def check_pty(args):
329            """Checks adb shell PTY allocation.
330
331            Tests |args| for terminal and non-terminal stdin.
332
333            Args:
334                args: -Tt args in a list (e.g. ['-t', '-t']).
335
336            Returns:
337                A tuple (<terminal>, <non-terminal>). True indicates
338                the corresponding shell allocated a remote PTY.
339            """
340            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
341
342            terminal = subprocess.Popen(
343                    test_cmd, stdin=None,
344                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
345            terminal.communicate()
346
347            non_terminal = subprocess.Popen(
348                    test_cmd, stdin=subprocess.PIPE,
349                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
350            non_terminal.communicate()
351
352            return (terminal.returncode == 0, non_terminal.returncode == 0)
353
354        # -T: never allocate PTY.
355        self.assertEqual((False, False), check_pty(['-T']))
356
357        # These tests require a new device.
358        if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
359            # No args: PTY only if stdin is a terminal and shell is interactive,
360            # which is difficult to reliably test from a script.
361            self.assertEqual((False, False), check_pty([]))
362
363            # -t: PTY if stdin is a terminal.
364            self.assertEqual((True, False), check_pty(['-t']))
365
366        # -t -t: always allocate PTY.
367        self.assertEqual((True, True), check_pty(['-t', '-t']))
368
369        # -tt: always allocate PTY, POSIX style (http://b/32216152).
370        self.assertEqual((True, True), check_pty(['-tt']))
371
372        # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
373        # we follow the man page instead.
374        self.assertEqual((True, True), check_pty(['-ttt']))
375
376        # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
377        self.assertEqual((True, True), check_pty(['-ttx']))
378
379        # -Ttt: -tt cancels out -T.
380        self.assertEqual((True, True), check_pty(['-Ttt']))
381
382        # -ttT: -T cancels out -tt.
383        self.assertEqual((False, False), check_pty(['-ttT']))
384
385    def test_shell_protocol(self):
386        """Tests the shell protocol on the device.
387
388        If the device supports shell protocol, this gives us the ability
389        to separate stdout/stderr and return the exit code directly.
390
391        Bug: http://b/19734861
392        """
393        if not self.device.has_shell_protocol():
394            raise unittest.SkipTest('shell protocol unsupported on this device')
395
396        # Shell protocol should be used by default.
397        result = self.device.shell_nocheck(
398                shlex.split('echo foo; echo bar >&2; exit 17'))
399        self.assertEqual(17, result[0])
400        self.assertEqual('foo' + self.device.linesep, result[1])
401        self.assertEqual('bar' + self.device.linesep, result[2])
402
403        self.assertEqual(17, self._interactive_shell([], 'exit 17'))
404
405        # -x flag should disable shell protocol.
406        result = self.device.shell_nocheck(
407                shlex.split('-x echo foo; echo bar >&2; exit 17'))
408        self.assertEqual(0, result[0])
409        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
410        self.assertEqual('', result[2])
411
412        self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
413
414    def test_non_interactive_sigint(self):
415        """Tests that SIGINT in a non-interactive shell kills the process.
416
417        This requires the shell protocol in order to detect the broken
418        pipe; raw data transfer mode will only see the break once the
419        subprocess tries to read or write.
420
421        Bug: http://b/23825725
422        """
423        if not self.device.has_shell_protocol():
424            raise unittest.SkipTest('shell protocol unsupported on this device')
425
426        # Start a long-running process.
427        sleep_proc = subprocess.Popen(
428                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
429                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
430                stderr=subprocess.STDOUT)
431        remote_pid = sleep_proc.stdout.readline().strip()
432        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
433        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
434
435        # Verify that the process is running, send signal, verify it stopped.
436        self.device.shell(proc_query)
437        os.kill(sleep_proc.pid, signal.SIGINT)
438        sleep_proc.communicate()
439
440        # It can take some time for the process to receive the signal and die.
441        end_time = time.time() + 3
442        while self.device.shell_nocheck(proc_query)[0] != 1:
443            self.assertFalse(time.time() > end_time,
444                             'subprocess failed to terminate in time')
445
446    def test_non_interactive_stdin(self):
447        """Tests that non-interactive shells send stdin."""
448        if not self.device.has_shell_protocol():
449            raise unittest.SkipTest('non-interactive stdin unsupported '
450                                    'on this device')
451
452        # Test both small and large inputs.
453        small_input = 'foo'
454        large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
455                                                  string.digits))
456
457        for input in (small_input, large_input):
458            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
459                                    stdin=subprocess.PIPE,
460                                    stdout=subprocess.PIPE,
461                                    stderr=subprocess.PIPE)
462            stdout, stderr = proc.communicate(input)
463            self.assertEqual(input.splitlines(), stdout.splitlines())
464            self.assertEqual('', stderr)
465
466    def test_sighup(self):
467        """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
468        log_path = "/data/local/tmp/adb_signal_test.log"
469
470        # Clear the output file.
471        self.device.shell_nocheck(["echo", ">", log_path])
472
473        script = """
474            trap "echo SIGINT > {path}; exit 0" SIGINT
475            trap "echo SIGHUP > {path}; exit 0" SIGHUP
476            echo Waiting
477            read
478        """.format(path=log_path)
479
480        script = ";".join([x.strip() for x in script.strip().splitlines()])
481
482        process = self.device.shell_popen([script], kill_atexit=False,
483                                          stdin=subprocess.PIPE,
484                                          stdout=subprocess.PIPE)
485
486        self.assertEqual("Waiting\n", process.stdout.readline())
487        process.send_signal(signal.SIGINT)
488        process.wait()
489
490        # Waiting for the local adb to finish is insufficient, since it hangs
491        # up immediately.
492        time.sleep(1)
493
494        stdout, _ = self.device.shell(["cat", log_path])
495        self.assertEqual(stdout.strip(), "SIGHUP")
496
497    def test_exit_stress(self):
498        """Hammer `adb shell exit 42` with multiple threads."""
499        thread_count = 48
500        result = dict()
501        def hammer(thread_idx, thread_count, result):
502            success = True
503            for i in range(thread_idx, 240, thread_count):
504                ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
505                if ret != i % 256:
506                    success = False
507                    break
508            result[thread_idx] = success
509
510        threads = []
511        for i in range(thread_count):
512            thread = threading.Thread(target=hammer, args=(i, thread_count, result))
513            thread.start()
514            threads.append(thread)
515        for thread in threads:
516            thread.join()
517        for i, success in result.iteritems():
518            self.assertTrue(success)
519
520
521class ArgumentEscapingTest(DeviceTest):
522    def test_shell_escaping(self):
523        """Make sure that argument escaping is somewhat sane."""
524
525        # http://b/19734868
526        # Note that this actually matches ssh(1)'s behavior --- it's
527        # converted to `sh -c echo hello; echo world` which sh interprets
528        # as `sh -c echo` (with an argument to that shell of "hello"),
529        # and then `echo world` back in the first shell.
530        result = self.device.shell(
531            shlex.split("sh -c 'echo hello; echo world'"))[0]
532        result = result.splitlines()
533        self.assertEqual(['', 'world'], result)
534        # If you really wanted "hello" and "world", here's what you'd do:
535        result = self.device.shell(
536            shlex.split(r'echo hello\;echo world'))[0].splitlines()
537        self.assertEqual(['hello', 'world'], result)
538
539        # http://b/15479704
540        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
541        self.assertEqual('t', result)
542        result = self.device.shell(
543            shlex.split("sh -c 'true && echo t'"))[0].strip()
544        self.assertEqual('t', result)
545
546        # http://b/20564385
547        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
548        self.assertEqual('t', result)
549        result = self.device.shell(
550            shlex.split(r'echo -n 123\;uname'))[0].strip()
551        self.assertEqual('123Linux', result)
552
553    def test_install_argument_escaping(self):
554        """Make sure that install argument escaping works."""
555        # http://b/20323053, http://b/3090932.
556        for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
557            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
558                                             delete=False)
559            tf.close()
560
561            # Installing bogus .apks fails if the device supports exit codes.
562            try:
563                output = self.device.install(tf.name)
564            except subprocess.CalledProcessError as e:
565                output = e.output
566
567            self.assertIn(file_suffix, output)
568            os.remove(tf.name)
569
570
571class RootUnrootTest(DeviceTest):
572    def _test_root(self):
573        message = self.device.root()
574        if 'adbd cannot run as root in production builds' in message:
575            return
576        self.device.wait()
577        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
578
579    def _test_unroot(self):
580        self.device.unroot()
581        self.device.wait()
582        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
583
584    def test_root_unroot(self):
585        """Make sure that adb root and adb unroot work, using id(1)."""
586        if self.device.get_prop('ro.debuggable') != '1':
587            raise unittest.SkipTest('requires rootable build')
588
589        original_user = self.device.shell(['id', '-un'])[0].strip()
590        try:
591            if original_user == 'root':
592                self._test_unroot()
593                self._test_root()
594            elif original_user == 'shell':
595                self._test_root()
596                self._test_unroot()
597        finally:
598            if original_user == 'root':
599                self.device.root()
600            else:
601                self.device.unroot()
602            self.device.wait()
603
604
605class TcpIpTest(DeviceTest):
606    def test_tcpip_failure_raises(self):
607        """adb tcpip requires a port.
608
609        Bug: http://b/22636927
610        """
611        self.assertRaises(
612            subprocess.CalledProcessError, self.device.tcpip, '')
613        self.assertRaises(
614            subprocess.CalledProcessError, self.device.tcpip, 'foo')
615
616
617class SystemPropertiesTest(DeviceTest):
618    def test_get_prop(self):
619        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
620
621    @requires_root
622    def test_set_prop(self):
623        prop_name = 'foo.bar'
624        self.device.shell(['setprop', prop_name, '""'])
625
626        self.device.set_prop(prop_name, 'qux')
627        self.assertEqual(
628            self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
629
630
631def compute_md5(string):
632    hsh = hashlib.md5()
633    hsh.update(string)
634    return hsh.hexdigest()
635
636
637def get_md5_prog(device):
638    """Older platforms (pre-L) had the name md5 rather than md5sum."""
639    try:
640        device.shell(['md5sum', '/proc/uptime'])
641        return 'md5sum'
642    except adb.ShellError:
643        return 'md5'
644
645
646class HostFile(object):
647    def __init__(self, handle, checksum):
648        self.handle = handle
649        self.checksum = checksum
650        self.full_path = handle.name
651        self.base_name = os.path.basename(self.full_path)
652
653
654class DeviceFile(object):
655    def __init__(self, checksum, full_path):
656        self.checksum = checksum
657        self.full_path = full_path
658        self.base_name = posixpath.basename(self.full_path)
659
660
661def make_random_host_files(in_dir, num_files):
662    min_size = 1 * (1 << 10)
663    max_size = 16 * (1 << 10)
664
665    files = []
666    for _ in xrange(num_files):
667        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
668
669        size = random.randrange(min_size, max_size, 1024)
670        rand_str = os.urandom(size)
671        file_handle.write(rand_str)
672        file_handle.flush()
673        file_handle.close()
674
675        md5 = compute_md5(rand_str)
676        files.append(HostFile(file_handle, md5))
677    return files
678
679
680def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
681    min_size = 1 * (1 << 10)
682    max_size = 16 * (1 << 10)
683
684    files = []
685    for file_num in xrange(num_files):
686        size = random.randrange(min_size, max_size, 1024)
687
688        base_name = prefix + str(file_num)
689        full_path = posixpath.join(in_dir, base_name)
690
691        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
692                      'bs={}'.format(size), 'count=1'])
693        dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
694
695        files.append(DeviceFile(dev_md5, full_path))
696    return files
697
698
699class FileOperationsTest(DeviceTest):
700    SCRATCH_DIR = '/data/local/tmp'
701    DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
702    DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
703
704    def _verify_remote(self, checksum, remote_path):
705        dev_md5, _ = self.device.shell([get_md5_prog(self.device),
706                                        remote_path])[0].split()
707        self.assertEqual(checksum, dev_md5)
708
709    def _verify_local(self, checksum, local_path):
710        with open(local_path, 'rb') as host_file:
711            host_md5 = compute_md5(host_file.read())
712            self.assertEqual(host_md5, checksum)
713
714    def test_push(self):
715        """Push a randomly generated file to specified device."""
716        kbytes = 512
717        tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
718        rand_str = os.urandom(1024 * kbytes)
719        tmp.write(rand_str)
720        tmp.close()
721
722        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
723        self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
724
725        self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
726        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
727
728        os.remove(tmp.name)
729
730    def test_push_dir(self):
731        """Push a randomly generated directory of files to the device."""
732        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
733        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
734
735        try:
736            host_dir = tempfile.mkdtemp()
737
738            # Make sure the temp directory isn't setuid, or else adb will complain.
739            os.chmod(host_dir, 0o700)
740
741            # Create 32 random files.
742            temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
743            self.device.push(host_dir, self.DEVICE_TEMP_DIR)
744
745            for temp_file in temp_files:
746                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
747                                             os.path.basename(host_dir),
748                                             temp_file.base_name)
749                self._verify_remote(temp_file.checksum, remote_path)
750            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
751        finally:
752            if host_dir is not None:
753                shutil.rmtree(host_dir)
754
755    @unittest.expectedFailure # b/25566053
756    def test_push_empty(self):
757        """Push a directory containing an empty directory to the device."""
758        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
759        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
760
761        try:
762            host_dir = tempfile.mkdtemp()
763
764            # Make sure the temp directory isn't setuid, or else adb will complain.
765            os.chmod(host_dir, 0o700)
766
767            # Create an empty directory.
768            os.mkdir(os.path.join(host_dir, 'empty'))
769
770            self.device.push(host_dir, self.DEVICE_TEMP_DIR)
771
772            test_empty_cmd = ['[', '-d',
773                              os.path.join(self.DEVICE_TEMP_DIR, 'empty')]
774            rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
775            self.assertEqual(rc, 0)
776            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
777        finally:
778            if host_dir is not None:
779                shutil.rmtree(host_dir)
780
781    @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
782    def test_push_symlink(self):
783        """Push a symlink.
784
785        Bug: http://b/31491920
786        """
787        try:
788            host_dir = tempfile.mkdtemp()
789
790            # Make sure the temp directory isn't setuid, or else adb will
791            # complain.
792            os.chmod(host_dir, 0o700)
793
794            with open(os.path.join(host_dir, 'foo'), 'w') as f:
795                f.write('foo')
796
797            symlink_path = os.path.join(host_dir, 'symlink')
798            os.symlink('foo', symlink_path)
799
800            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
801            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
802            self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
803            rc, out, _ = self.device.shell_nocheck(
804                ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
805            self.assertEqual(0, rc)
806            self.assertEqual(out.strip(), 'foo')
807        finally:
808            if host_dir is not None:
809                shutil.rmtree(host_dir)
810
811    def test_multiple_push(self):
812        """Push multiple files to the device in one adb push command.
813
814        Bug: http://b/25324823
815        """
816
817        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
818        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
819
820        try:
821            host_dir = tempfile.mkdtemp()
822
823            # Create some random files and a subdirectory containing more files.
824            temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
825
826            subdir = os.path.join(host_dir, 'subdir')
827            os.mkdir(subdir)
828            subdir_temp_files = make_random_host_files(in_dir=subdir,
829                                                       num_files=4)
830
831            paths = map(lambda temp_file: temp_file.full_path, temp_files)
832            paths.append(subdir)
833            self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
834
835            for temp_file in temp_files:
836                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
837                                             temp_file.base_name)
838                self._verify_remote(temp_file.checksum, remote_path)
839
840            for subdir_temp_file in subdir_temp_files:
841                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
842                                             # BROKEN: http://b/25394682
843                                             # 'subdir';
844                                             temp_file.base_name)
845                self._verify_remote(temp_file.checksum, remote_path)
846
847
848            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
849        finally:
850            if host_dir is not None:
851                shutil.rmtree(host_dir)
852
853    @requires_non_root
854    def test_push_error_reporting(self):
855        """Make sure that errors that occur while pushing a file get reported
856
857        Bug: http://b/26816782
858        """
859        with tempfile.NamedTemporaryFile() as tmp_file:
860            tmp_file.write('\0' * 1024 * 1024)
861            tmp_file.flush()
862            try:
863                self.device.push(local=tmp_file.name, remote='/system/')
864                self.fail('push should not have succeeded')
865            except subprocess.CalledProcessError as e:
866                output = e.output
867
868            self.assertTrue('Permission denied' in output or
869                            'Read-only file system' in output)
870
871    def _test_pull(self, remote_file, checksum):
872        tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
873        tmp_write.close()
874        self.device.pull(remote=remote_file, local=tmp_write.name)
875        with open(tmp_write.name, 'rb') as tmp_read:
876            host_contents = tmp_read.read()
877            host_md5 = compute_md5(host_contents)
878        self.assertEqual(checksum, host_md5)
879        os.remove(tmp_write.name)
880
881    @requires_non_root
882    def test_pull_error_reporting(self):
883        self.device.shell(['touch', self.DEVICE_TEMP_FILE])
884        self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
885
886        try:
887            output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
888        except subprocess.CalledProcessError as e:
889            output = e.output
890
891        self.assertIn('Permission denied', output)
892
893        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
894
895    def test_pull(self):
896        """Pull a randomly generated file from specified device."""
897        kbytes = 512
898        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
899        cmd = ['dd', 'if=/dev/urandom',
900               'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
901               'count={}'.format(kbytes)]
902        self.device.shell(cmd)
903        dev_md5, _ = self.device.shell(
904            [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
905        self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
906        self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
907
908    def test_pull_dir(self):
909        """Pull a randomly generated directory of files from the device."""
910        try:
911            host_dir = tempfile.mkdtemp()
912
913            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
914            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
915
916            # Populate device directory with random files.
917            temp_files = make_random_device_files(
918                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
919
920            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
921
922            for temp_file in temp_files:
923                host_path = os.path.join(
924                    host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
925                    temp_file.base_name)
926                self._verify_local(temp_file.checksum, host_path)
927
928            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
929        finally:
930            if host_dir is not None:
931                shutil.rmtree(host_dir)
932
933    def test_pull_dir_symlink(self):
934        """Pull a directory into a symlink to a directory.
935
936        Bug: http://b/27362811
937        """
938        if os.name != 'posix':
939            raise unittest.SkipTest('requires POSIX')
940
941        try:
942            host_dir = tempfile.mkdtemp()
943            real_dir = os.path.join(host_dir, 'dir')
944            symlink = os.path.join(host_dir, 'symlink')
945            os.mkdir(real_dir)
946            os.symlink(real_dir, symlink)
947
948            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
949            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
950
951            # Populate device directory with random files.
952            temp_files = make_random_device_files(
953                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
954
955            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
956
957            for temp_file in temp_files:
958                host_path = os.path.join(
959                    real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
960                    temp_file.base_name)
961                self._verify_local(temp_file.checksum, host_path)
962
963            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
964        finally:
965            if host_dir is not None:
966                shutil.rmtree(host_dir)
967
968    def test_pull_dir_symlink_collision(self):
969        """Pull a directory into a colliding symlink to directory."""
970        if os.name != 'posix':
971            raise unittest.SkipTest('requires POSIX')
972
973        try:
974            host_dir = tempfile.mkdtemp()
975            real_dir = os.path.join(host_dir, 'real')
976            tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
977            symlink = os.path.join(host_dir, tmp_dirname)
978            os.mkdir(real_dir)
979            os.symlink(real_dir, symlink)
980
981            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
982            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
983
984            # Populate device directory with random files.
985            temp_files = make_random_device_files(
986                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
987
988            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
989
990            for temp_file in temp_files:
991                host_path = os.path.join(real_dir, temp_file.base_name)
992                self._verify_local(temp_file.checksum, host_path)
993
994            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
995        finally:
996            if host_dir is not None:
997                shutil.rmtree(host_dir)
998
999    def test_pull_dir_nonexistent(self):
1000        """Pull a directory of files from the device to a nonexistent path."""
1001        try:
1002            host_dir = tempfile.mkdtemp()
1003            dest_dir = os.path.join(host_dir, 'dest')
1004
1005            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1006            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1007
1008            # Populate device directory with random files.
1009            temp_files = make_random_device_files(
1010                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1011
1012            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1013
1014            for temp_file in temp_files:
1015                host_path = os.path.join(dest_dir, temp_file.base_name)
1016                self._verify_local(temp_file.checksum, host_path)
1017
1018            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1019        finally:
1020            if host_dir is not None:
1021                shutil.rmtree(host_dir)
1022
1023    def test_pull_symlink_dir(self):
1024        """Pull a symlink to a directory of symlinks to files."""
1025        try:
1026            host_dir = tempfile.mkdtemp()
1027
1028            remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1029            remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1030            remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1031
1032            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1033            self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1034            self.device.shell(['ln', '-s', remote_links, remote_symlink])
1035
1036            # Populate device directory with random files.
1037            temp_files = make_random_device_files(
1038                self.device, in_dir=remote_dir, num_files=32)
1039
1040            for temp_file in temp_files:
1041                self.device.shell(
1042                    ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1043                     posixpath.join(remote_links, temp_file.base_name)])
1044
1045            self.device.pull(remote=remote_symlink, local=host_dir)
1046
1047            for temp_file in temp_files:
1048                host_path = os.path.join(
1049                    host_dir, 'symlink', temp_file.base_name)
1050                self._verify_local(temp_file.checksum, host_path)
1051
1052            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1053        finally:
1054            if host_dir is not None:
1055                shutil.rmtree(host_dir)
1056
1057    def test_pull_empty(self):
1058        """Pull a directory containing an empty directory from the device."""
1059        try:
1060            host_dir = tempfile.mkdtemp()
1061
1062            remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1063            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1064            self.device.shell(['mkdir', '-p', remote_empty_path])
1065
1066            self.device.pull(remote=remote_empty_path, local=host_dir)
1067            self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1068        finally:
1069            if host_dir is not None:
1070                shutil.rmtree(host_dir)
1071
1072    def test_multiple_pull(self):
1073        """Pull a randomly generated directory of files from the device."""
1074
1075        try:
1076            host_dir = tempfile.mkdtemp()
1077
1078            subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1079            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1080            self.device.shell(['mkdir', '-p', subdir])
1081
1082            # Create some random files and a subdirectory containing more files.
1083            temp_files = make_random_device_files(
1084                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1085
1086            subdir_temp_files = make_random_device_files(
1087                self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1088
1089            paths = map(lambda temp_file: temp_file.full_path, temp_files)
1090            paths.append(subdir)
1091            self.device._simple_call(['pull'] + paths + [host_dir])
1092
1093            for temp_file in temp_files:
1094                local_path = os.path.join(host_dir, temp_file.base_name)
1095                self._verify_local(temp_file.checksum, local_path)
1096
1097            for subdir_temp_file in subdir_temp_files:
1098                local_path = os.path.join(host_dir,
1099                                          'subdir',
1100                                          subdir_temp_file.base_name)
1101                self._verify_local(subdir_temp_file.checksum, local_path)
1102
1103            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1104        finally:
1105            if host_dir is not None:
1106                shutil.rmtree(host_dir)
1107
1108    def verify_sync(self, device, temp_files, device_dir):
1109        """Verifies that a list of temp files was synced to the device."""
1110        # Confirm that every file on the device mirrors that on the host.
1111        for temp_file in temp_files:
1112            device_full_path = posixpath.join(
1113                device_dir, temp_file.base_name)
1114            dev_md5, _ = device.shell(
1115                [get_md5_prog(self.device), device_full_path])[0].split()
1116            self.assertEqual(temp_file.checksum, dev_md5)
1117
1118    def test_sync(self):
1119        """Sync a host directory to the data partition."""
1120
1121        try:
1122            base_dir = tempfile.mkdtemp()
1123
1124            # Create mirror device directory hierarchy within base_dir.
1125            full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1126            os.makedirs(full_dir_path)
1127
1128            # Create 32 random files within the host mirror.
1129            temp_files = make_random_host_files(
1130                in_dir=full_dir_path, num_files=32)
1131
1132            # Clean up any stale files on the device.
1133            device = adb.get_device()  # pylint: disable=no-member
1134            device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1135
1136            old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1137            os.environ['ANDROID_PRODUCT_OUT'] = base_dir
1138            device.sync('data')
1139            if old_product_out is None:
1140                del os.environ['ANDROID_PRODUCT_OUT']
1141            else:
1142                os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
1143
1144            self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
1145
1146            #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1147        finally:
1148            if base_dir is not None:
1149                shutil.rmtree(base_dir)
1150
1151    def test_push_sync(self):
1152        """Sync a host directory to a specific path."""
1153
1154        try:
1155            temp_dir = tempfile.mkdtemp()
1156            temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1157
1158            device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1159
1160            # Clean up any stale files on the device.
1161            device = adb.get_device()  # pylint: disable=no-member
1162            device.shell(['rm', '-rf', device_dir])
1163
1164            device.push(temp_dir, device_dir, sync=True)
1165
1166            self.verify_sync(device, temp_files, device_dir)
1167
1168            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1169        finally:
1170            if temp_dir is not None:
1171                shutil.rmtree(temp_dir)
1172
1173    def test_unicode_paths(self):
1174        """Ensure that we can support non-ASCII paths, even on Windows."""
1175        name = u'로보카 폴리'
1176
1177        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1178        remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1179
1180        ## push.
1181        tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1182        tf.close()
1183        self.device.push(tf.name, remote_path)
1184        os.remove(tf.name)
1185        self.assertFalse(os.path.exists(tf.name))
1186
1187        # Verify that the device ended up with the expected UTF-8 path
1188        output = self.device.shell(
1189                ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1190        self.assertEqual(remote_path.encode('utf-8'), output)
1191
1192        # pull.
1193        self.device.pull(remote_path, tf.name)
1194        self.assertTrue(os.path.exists(tf.name))
1195        os.remove(tf.name)
1196        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1197
1198
1199class DeviceOfflineTest(DeviceTest):
1200    def _get_device_state(self, serialno):
1201        output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1202        for line in output.split('\n'):
1203            m = re.match('(\S+)\s+(\S+)', line)
1204            if m and m.group(1) == serialno:
1205                return m.group(2)
1206        return None
1207
1208    def disabled_test_killed_when_pushing_a_large_file(self):
1209        """
1210           While running adb push with a large file, kill adb server.
1211           Occasionally the device becomes offline. Because the device is still
1212           reading data without realizing that the adb server has been restarted.
1213           Test if we can bring the device online automatically now.
1214           http://b/32952319
1215        """
1216        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1217        # 1. Push a large file
1218        file_path = 'tmp_large_file'
1219        try:
1220            fh = open(file_path, 'w')
1221            fh.write('\0' * (100 * 1024 * 1024))
1222            fh.close()
1223            subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1224            time.sleep(0.1)
1225            # 2. Kill the adb server
1226            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1227            subproc.terminate()
1228        finally:
1229            try:
1230                os.unlink(file_path)
1231            except:
1232                pass
1233        # 3. See if the device still exist.
1234        # Sleep to wait for the adb server exit.
1235        time.sleep(0.5)
1236        # 4. The device should be online
1237        self.assertEqual(self._get_device_state(serialno), 'device')
1238
1239    def disabled_test_killed_when_pulling_a_large_file(self):
1240        """
1241           While running adb pull with a large file, kill adb server.
1242           Occasionally the device can't be connected. Because the device is trying to
1243           send a message larger than what is expected by the adb server.
1244           Test if we can bring the device online automatically now.
1245        """
1246        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1247        file_path = 'tmp_large_file'
1248        try:
1249            # 1. Create a large file on device.
1250            self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1251                               'bs=1000000', 'count=100'])
1252            # 2. Pull the large file on host.
1253            subproc = subprocess.Popen(self.device.adb_cmd +
1254                                       ['pull','/data/local/tmp/tmp_large_file', file_path])
1255            time.sleep(0.1)
1256            # 3. Kill the adb server
1257            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1258            subproc.terminate()
1259        finally:
1260            try:
1261                os.unlink(file_path)
1262            except:
1263                pass
1264        # 4. See if the device still exist.
1265        # Sleep to wait for the adb server exit.
1266        time.sleep(0.5)
1267        self.assertEqual(self._get_device_state(serialno), 'device')
1268
1269
1270    def test_packet_size_regression(self):
1271        """Test for http://b/37783561
1272
1273        Receiving packets of a length divisible by 512 but not 1024 resulted in
1274        the adb client waiting indefinitely for more input.
1275        """
1276        # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1277        # Probe some surrounding values as well, for the hell of it.
1278        for base in [512] + range(1024, 1024 * 16, 1024):
1279            for offset in [-6, -5, -4]:
1280                length = base + offset
1281                cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1282                       'echo', 'foo']
1283                rc, stdout, _ = self.device.shell_nocheck(cmd)
1284
1285                self.assertEqual(0, rc)
1286
1287                # Output should be '\0' * length, followed by "foo\n"
1288                self.assertEqual(length, len(stdout) - 4)
1289                self.assertEqual(stdout, "\0" * length + "foo\n")
1290
1291
1292def main():
1293    random.seed(0)
1294    if len(adb.get_devices()) > 0:
1295        suite = unittest.TestLoader().loadTestsFromName(__name__)
1296        unittest.TextTestRunner(verbosity=3).run(suite)
1297    else:
1298        print('Test suite must be run with attached devices')
1299
1300
1301if __name__ == '__main__':
1302    main()
1303