1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2015 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18from __future__ import print_function 19 20import contextlib 21import hashlib 22import os 23import posixpath 24import random 25import re 26import shlex 27import shutil 28import signal 29import socket 30import string 31import subprocess 32import sys 33import tempfile 34import threading 35import time 36import unittest 37 38import adb 39 40def requires_root(func): 41 def wrapper(self, *args): 42 if self.device.get_prop('ro.debuggable') != '1': 43 raise unittest.SkipTest('requires rootable build') 44 45 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 46 if not was_root: 47 self.device.root() 48 self.device.wait() 49 50 try: 51 func(self, *args) 52 finally: 53 if not was_root: 54 self.device.unroot() 55 self.device.wait() 56 57 return wrapper 58 59 60def requires_non_root(func): 61 def wrapper(self, *args): 62 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 63 if was_root: 64 self.device.unroot() 65 self.device.wait() 66 67 try: 68 func(self, *args) 69 finally: 70 if was_root: 71 self.device.root() 72 self.device.wait() 73 74 return wrapper 75 76 77class DeviceTest(unittest.TestCase): 78 def setUp(self): 79 self.device = adb.get_device() 80 81 82class ForwardReverseTest(DeviceTest): 83 def _test_no_rebind(self, description, direction_list, direction, 84 direction_no_rebind, direction_remove_all): 85 msg = direction_list() 86 self.assertEqual('', msg.strip(), 87 description + ' list must be empty to run this test.') 88 89 # Use --no-rebind with no existing binding 90 direction_no_rebind('tcp:5566', 'tcp:6655') 91 msg = direction_list() 92 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 93 94 # Use --no-rebind with existing binding 95 with self.assertRaises(subprocess.CalledProcessError): 96 direction_no_rebind('tcp:5566', 'tcp:6677') 97 msg = direction_list() 98 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 99 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 100 101 # Use the absence of --no-rebind with existing binding 102 direction('tcp:5566', 'tcp:6677') 103 msg = direction_list() 104 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 105 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 106 107 direction_remove_all() 108 msg = direction_list() 109 self.assertEqual('', msg.strip()) 110 111 def test_forward_no_rebind(self): 112 self._test_no_rebind('forward', self.device.forward_list, 113 self.device.forward, self.device.forward_no_rebind, 114 self.device.forward_remove_all) 115 116 def test_reverse_no_rebind(self): 117 self._test_no_rebind('reverse', self.device.reverse_list, 118 self.device.reverse, self.device.reverse_no_rebind, 119 self.device.reverse_remove_all) 120 121 def test_forward(self): 122 msg = self.device.forward_list() 123 self.assertEqual('', msg.strip(), 124 'Forwarding list must be empty to run this test.') 125 self.device.forward('tcp:5566', 'tcp:6655') 126 msg = self.device.forward_list() 127 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 128 self.device.forward('tcp:7788', 'tcp:8877') 129 msg = self.device.forward_list() 130 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 131 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 132 self.device.forward_remove('tcp:5566') 133 msg = self.device.forward_list() 134 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 135 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 136 self.device.forward_remove_all() 137 msg = self.device.forward_list() 138 self.assertEqual('', msg.strip()) 139 140 def test_forward_tcp_port_0(self): 141 self.assertEqual('', self.device.forward_list().strip(), 142 'Forwarding list must be empty to run this test.') 143 144 try: 145 # If resolving TCP port 0 is supported, `adb forward` will print 146 # the actual port number. 147 port = self.device.forward('tcp:0', 'tcp:8888').strip() 148 if not port: 149 raise unittest.SkipTest('Forwarding tcp:0 is not available.') 150 151 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 152 self.device.forward_list())) 153 finally: 154 self.device.forward_remove_all() 155 156 def test_reverse(self): 157 msg = self.device.reverse_list() 158 self.assertEqual('', msg.strip(), 159 'Reverse forwarding list must be empty to run this test.') 160 self.device.reverse('tcp:5566', 'tcp:6655') 161 msg = self.device.reverse_list() 162 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 163 self.device.reverse('tcp:7788', 'tcp:8877') 164 msg = self.device.reverse_list() 165 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 166 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 167 self.device.reverse_remove('tcp:5566') 168 msg = self.device.reverse_list() 169 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 170 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 171 self.device.reverse_remove_all() 172 msg = self.device.reverse_list() 173 self.assertEqual('', msg.strip()) 174 175 def test_reverse_tcp_port_0(self): 176 self.assertEqual('', self.device.reverse_list().strip(), 177 'Reverse list must be empty to run this test.') 178 179 try: 180 # If resolving TCP port 0 is supported, `adb reverse` will print 181 # the actual port number. 182 port = self.device.reverse('tcp:0', 'tcp:8888').strip() 183 if not port: 184 raise unittest.SkipTest('Reversing tcp:0 is not available.') 185 186 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 187 self.device.reverse_list())) 188 finally: 189 self.device.reverse_remove_all() 190 191 # Note: If you run this test when adb connect'd to a physical device over 192 # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821 193 def test_forward_reverse_echo(self): 194 """Send data through adb forward and read it back via adb reverse""" 195 forward_port = 12345 196 reverse_port = forward_port + 1 197 forward_spec = 'tcp:' + str(forward_port) 198 reverse_spec = 'tcp:' + str(reverse_port) 199 forward_setup = False 200 reverse_setup = False 201 202 try: 203 # listen on localhost:forward_port, connect to remote:forward_port 204 self.device.forward(forward_spec, forward_spec) 205 forward_setup = True 206 # listen on remote:forward_port, connect to localhost:reverse_port 207 self.device.reverse(forward_spec, reverse_spec) 208 reverse_setup = True 209 210 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 211 with contextlib.closing(listener): 212 # Use SO_REUSEADDR so that subsequent runs of the test can grab 213 # the port even if it is in TIME_WAIT. 214 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 215 216 # Listen on localhost:reverse_port before connecting to 217 # localhost:forward_port because that will cause adb to connect 218 # back to localhost:reverse_port. 219 listener.bind(('127.0.0.1', reverse_port)) 220 listener.listen(4) 221 222 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 223 with contextlib.closing(client): 224 # Connect to the listener. 225 client.connect(('127.0.0.1', forward_port)) 226 227 # Accept the client connection. 228 accepted_connection, addr = listener.accept() 229 with contextlib.closing(accepted_connection) as server: 230 data = 'hello' 231 232 # Send data into the port setup by adb forward. 233 client.sendall(data) 234 # Explicitly close() so that server gets EOF. 235 client.close() 236 237 # Verify that the data came back via adb reverse. 238 self.assertEqual(data, server.makefile().read()) 239 finally: 240 if reverse_setup: 241 self.device.reverse_remove(forward_spec) 242 if forward_setup: 243 self.device.forward_remove(forward_spec) 244 245 246class ShellTest(DeviceTest): 247 def _interactive_shell(self, shell_args, input): 248 """Runs an interactive adb shell. 249 250 Args: 251 shell_args: List of string arguments to `adb shell`. 252 input: String input to send to the interactive shell. 253 254 Returns: 255 The remote exit code. 256 257 Raises: 258 unittest.SkipTest: The device doesn't support exit codes. 259 """ 260 if not self.device.has_shell_protocol(): 261 raise unittest.SkipTest('exit codes are unavailable on this device') 262 263 proc = subprocess.Popen( 264 self.device.adb_cmd + ['shell'] + shell_args, 265 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 266 stderr=subprocess.PIPE) 267 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 268 # to explicitly add an exit command to close the session from the device 269 # side, plus the necessary newline to complete the interactive command. 270 proc.communicate(input + '; exit\n') 271 return proc.returncode 272 273 def test_cat(self): 274 """Check that we can at least cat a file.""" 275 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 276 elements = out.split() 277 self.assertEqual(len(elements), 2) 278 279 uptime, idle = elements 280 self.assertGreater(float(uptime), 0.0) 281 self.assertGreater(float(idle), 0.0) 282 283 def test_throws_on_failure(self): 284 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 285 286 def test_output_not_stripped(self): 287 out = self.device.shell(['echo', 'foo'])[0] 288 self.assertEqual(out, 'foo' + self.device.linesep) 289 290 def test_shell_command_length(self): 291 # Devices that have shell_v2 should be able to handle long commands. 292 if self.device.has_shell_protocol(): 293 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384]) 294 self.assertEqual(rc, 0) 295 self.assertTrue(out == ('x' * 16384 + '\n')) 296 297 def test_shell_nocheck_failure(self): 298 rc, out, _ = self.device.shell_nocheck(['false']) 299 self.assertNotEqual(rc, 0) 300 self.assertEqual(out, '') 301 302 def test_shell_nocheck_output_not_stripped(self): 303 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 304 self.assertEqual(rc, 0) 305 self.assertEqual(out, 'foo' + self.device.linesep) 306 307 def test_can_distinguish_tricky_results(self): 308 # If result checking on ADB shell is naively implemented as 309 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 310 # output from the result for a cmd of `echo -n 1`. 311 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 312 self.assertEqual(rc, 0) 313 self.assertEqual(out, '1') 314 315 def test_line_endings(self): 316 """Ensure that line ending translation is not happening in the pty. 317 318 Bug: http://b/19735063 319 """ 320 output = self.device.shell(['uname'])[0] 321 self.assertEqual(output, 'Linux' + self.device.linesep) 322 323 def test_pty_logic(self): 324 """Tests that a PTY is allocated when it should be. 325 326 PTY allocation behavior should match ssh. 327 """ 328 def check_pty(args): 329 """Checks adb shell PTY allocation. 330 331 Tests |args| for terminal and non-terminal stdin. 332 333 Args: 334 args: -Tt args in a list (e.g. ['-t', '-t']). 335 336 Returns: 337 A tuple (<terminal>, <non-terminal>). True indicates 338 the corresponding shell allocated a remote PTY. 339 """ 340 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 341 342 terminal = subprocess.Popen( 343 test_cmd, stdin=None, 344 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 345 terminal.communicate() 346 347 non_terminal = subprocess.Popen( 348 test_cmd, stdin=subprocess.PIPE, 349 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 350 non_terminal.communicate() 351 352 return (terminal.returncode == 0, non_terminal.returncode == 0) 353 354 # -T: never allocate PTY. 355 self.assertEqual((False, False), check_pty(['-T'])) 356 357 # These tests require a new device. 358 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()): 359 # No args: PTY only if stdin is a terminal and shell is interactive, 360 # which is difficult to reliably test from a script. 361 self.assertEqual((False, False), check_pty([])) 362 363 # -t: PTY if stdin is a terminal. 364 self.assertEqual((True, False), check_pty(['-t'])) 365 366 # -t -t: always allocate PTY. 367 self.assertEqual((True, True), check_pty(['-t', '-t'])) 368 369 # -tt: always allocate PTY, POSIX style (http://b/32216152). 370 self.assertEqual((True, True), check_pty(['-tt'])) 371 372 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but 373 # we follow the man page instead. 374 self.assertEqual((True, True), check_pty(['-ttt'])) 375 376 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error). 377 self.assertEqual((True, True), check_pty(['-ttx'])) 378 379 # -Ttt: -tt cancels out -T. 380 self.assertEqual((True, True), check_pty(['-Ttt'])) 381 382 # -ttT: -T cancels out -tt. 383 self.assertEqual((False, False), check_pty(['-ttT'])) 384 385 def test_shell_protocol(self): 386 """Tests the shell protocol on the device. 387 388 If the device supports shell protocol, this gives us the ability 389 to separate stdout/stderr and return the exit code directly. 390 391 Bug: http://b/19734861 392 """ 393 if not self.device.has_shell_protocol(): 394 raise unittest.SkipTest('shell protocol unsupported on this device') 395 396 # Shell protocol should be used by default. 397 result = self.device.shell_nocheck( 398 shlex.split('echo foo; echo bar >&2; exit 17')) 399 self.assertEqual(17, result[0]) 400 self.assertEqual('foo' + self.device.linesep, result[1]) 401 self.assertEqual('bar' + self.device.linesep, result[2]) 402 403 self.assertEqual(17, self._interactive_shell([], 'exit 17')) 404 405 # -x flag should disable shell protocol. 406 result = self.device.shell_nocheck( 407 shlex.split('-x echo foo; echo bar >&2; exit 17')) 408 self.assertEqual(0, result[0]) 409 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 410 self.assertEqual('', result[2]) 411 412 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17')) 413 414 def test_non_interactive_sigint(self): 415 """Tests that SIGINT in a non-interactive shell kills the process. 416 417 This requires the shell protocol in order to detect the broken 418 pipe; raw data transfer mode will only see the break once the 419 subprocess tries to read or write. 420 421 Bug: http://b/23825725 422 """ 423 if not self.device.has_shell_protocol(): 424 raise unittest.SkipTest('shell protocol unsupported on this device') 425 426 # Start a long-running process. 427 sleep_proc = subprocess.Popen( 428 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 429 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 430 stderr=subprocess.STDOUT) 431 remote_pid = sleep_proc.stdout.readline().strip() 432 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 433 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 434 435 # Verify that the process is running, send signal, verify it stopped. 436 self.device.shell(proc_query) 437 os.kill(sleep_proc.pid, signal.SIGINT) 438 sleep_proc.communicate() 439 440 # It can take some time for the process to receive the signal and die. 441 end_time = time.time() + 3 442 while self.device.shell_nocheck(proc_query)[0] != 1: 443 self.assertFalse(time.time() > end_time, 444 'subprocess failed to terminate in time') 445 446 def test_non_interactive_stdin(self): 447 """Tests that non-interactive shells send stdin.""" 448 if not self.device.has_shell_protocol(): 449 raise unittest.SkipTest('non-interactive stdin unsupported ' 450 'on this device') 451 452 # Test both small and large inputs. 453 small_input = 'foo' 454 large_input = '\n'.join(c * 100 for c in (string.ascii_letters + 455 string.digits)) 456 457 for input in (small_input, large_input): 458 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 459 stdin=subprocess.PIPE, 460 stdout=subprocess.PIPE, 461 stderr=subprocess.PIPE) 462 stdout, stderr = proc.communicate(input) 463 self.assertEqual(input.splitlines(), stdout.splitlines()) 464 self.assertEqual('', stderr) 465 466 def test_sighup(self): 467 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c""" 468 log_path = "/data/local/tmp/adb_signal_test.log" 469 470 # Clear the output file. 471 self.device.shell_nocheck(["echo", ">", log_path]) 472 473 script = """ 474 trap "echo SIGINT > {path}; exit 0" SIGINT 475 trap "echo SIGHUP > {path}; exit 0" SIGHUP 476 echo Waiting 477 read 478 """.format(path=log_path) 479 480 script = ";".join([x.strip() for x in script.strip().splitlines()]) 481 482 process = self.device.shell_popen([script], kill_atexit=False, 483 stdin=subprocess.PIPE, 484 stdout=subprocess.PIPE) 485 486 self.assertEqual("Waiting\n", process.stdout.readline()) 487 process.send_signal(signal.SIGINT) 488 process.wait() 489 490 # Waiting for the local adb to finish is insufficient, since it hangs 491 # up immediately. 492 time.sleep(1) 493 494 stdout, _ = self.device.shell(["cat", log_path]) 495 self.assertEqual(stdout.strip(), "SIGHUP") 496 497 def test_exit_stress(self): 498 """Hammer `adb shell exit 42` with multiple threads.""" 499 thread_count = 48 500 result = dict() 501 def hammer(thread_idx, thread_count, result): 502 success = True 503 for i in range(thread_idx, 240, thread_count): 504 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)]) 505 if ret != i % 256: 506 success = False 507 break 508 result[thread_idx] = success 509 510 threads = [] 511 for i in range(thread_count): 512 thread = threading.Thread(target=hammer, args=(i, thread_count, result)) 513 thread.start() 514 threads.append(thread) 515 for thread in threads: 516 thread.join() 517 for i, success in result.iteritems(): 518 self.assertTrue(success) 519 520 521class ArgumentEscapingTest(DeviceTest): 522 def test_shell_escaping(self): 523 """Make sure that argument escaping is somewhat sane.""" 524 525 # http://b/19734868 526 # Note that this actually matches ssh(1)'s behavior --- it's 527 # converted to `sh -c echo hello; echo world` which sh interprets 528 # as `sh -c echo` (with an argument to that shell of "hello"), 529 # and then `echo world` back in the first shell. 530 result = self.device.shell( 531 shlex.split("sh -c 'echo hello; echo world'"))[0] 532 result = result.splitlines() 533 self.assertEqual(['', 'world'], result) 534 # If you really wanted "hello" and "world", here's what you'd do: 535 result = self.device.shell( 536 shlex.split(r'echo hello\;echo world'))[0].splitlines() 537 self.assertEqual(['hello', 'world'], result) 538 539 # http://b/15479704 540 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 541 self.assertEqual('t', result) 542 result = self.device.shell( 543 shlex.split("sh -c 'true && echo t'"))[0].strip() 544 self.assertEqual('t', result) 545 546 # http://b/20564385 547 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 548 self.assertEqual('t', result) 549 result = self.device.shell( 550 shlex.split(r'echo -n 123\;uname'))[0].strip() 551 self.assertEqual('123Linux', result) 552 553 def test_install_argument_escaping(self): 554 """Make sure that install argument escaping works.""" 555 # http://b/20323053, http://b/3090932. 556 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"): 557 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 558 delete=False) 559 tf.close() 560 561 # Installing bogus .apks fails if the device supports exit codes. 562 try: 563 output = self.device.install(tf.name) 564 except subprocess.CalledProcessError as e: 565 output = e.output 566 567 self.assertIn(file_suffix, output) 568 os.remove(tf.name) 569 570 571class RootUnrootTest(DeviceTest): 572 def _test_root(self): 573 message = self.device.root() 574 if 'adbd cannot run as root in production builds' in message: 575 return 576 self.device.wait() 577 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 578 579 def _test_unroot(self): 580 self.device.unroot() 581 self.device.wait() 582 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 583 584 def test_root_unroot(self): 585 """Make sure that adb root and adb unroot work, using id(1).""" 586 if self.device.get_prop('ro.debuggable') != '1': 587 raise unittest.SkipTest('requires rootable build') 588 589 original_user = self.device.shell(['id', '-un'])[0].strip() 590 try: 591 if original_user == 'root': 592 self._test_unroot() 593 self._test_root() 594 elif original_user == 'shell': 595 self._test_root() 596 self._test_unroot() 597 finally: 598 if original_user == 'root': 599 self.device.root() 600 else: 601 self.device.unroot() 602 self.device.wait() 603 604 605class TcpIpTest(DeviceTest): 606 def test_tcpip_failure_raises(self): 607 """adb tcpip requires a port. 608 609 Bug: http://b/22636927 610 """ 611 self.assertRaises( 612 subprocess.CalledProcessError, self.device.tcpip, '') 613 self.assertRaises( 614 subprocess.CalledProcessError, self.device.tcpip, 'foo') 615 616 617class SystemPropertiesTest(DeviceTest): 618 def test_get_prop(self): 619 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 620 621 @requires_root 622 def test_set_prop(self): 623 prop_name = 'foo.bar' 624 self.device.shell(['setprop', prop_name, '""']) 625 626 self.device.set_prop(prop_name, 'qux') 627 self.assertEqual( 628 self.device.shell(['getprop', prop_name])[0].strip(), 'qux') 629 630 631def compute_md5(string): 632 hsh = hashlib.md5() 633 hsh.update(string) 634 return hsh.hexdigest() 635 636 637def get_md5_prog(device): 638 """Older platforms (pre-L) had the name md5 rather than md5sum.""" 639 try: 640 device.shell(['md5sum', '/proc/uptime']) 641 return 'md5sum' 642 except adb.ShellError: 643 return 'md5' 644 645 646class HostFile(object): 647 def __init__(self, handle, checksum): 648 self.handle = handle 649 self.checksum = checksum 650 self.full_path = handle.name 651 self.base_name = os.path.basename(self.full_path) 652 653 654class DeviceFile(object): 655 def __init__(self, checksum, full_path): 656 self.checksum = checksum 657 self.full_path = full_path 658 self.base_name = posixpath.basename(self.full_path) 659 660 661def make_random_host_files(in_dir, num_files): 662 min_size = 1 * (1 << 10) 663 max_size = 16 * (1 << 10) 664 665 files = [] 666 for _ in xrange(num_files): 667 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 668 669 size = random.randrange(min_size, max_size, 1024) 670 rand_str = os.urandom(size) 671 file_handle.write(rand_str) 672 file_handle.flush() 673 file_handle.close() 674 675 md5 = compute_md5(rand_str) 676 files.append(HostFile(file_handle, md5)) 677 return files 678 679 680def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 681 min_size = 1 * (1 << 10) 682 max_size = 16 * (1 << 10) 683 684 files = [] 685 for file_num in xrange(num_files): 686 size = random.randrange(min_size, max_size, 1024) 687 688 base_name = prefix + str(file_num) 689 full_path = posixpath.join(in_dir, base_name) 690 691 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 692 'bs={}'.format(size), 'count=1']) 693 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() 694 695 files.append(DeviceFile(dev_md5, full_path)) 696 return files 697 698 699class FileOperationsTest(DeviceTest): 700 SCRATCH_DIR = '/data/local/tmp' 701 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 702 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 703 704 def _verify_remote(self, checksum, remote_path): 705 dev_md5, _ = self.device.shell([get_md5_prog(self.device), 706 remote_path])[0].split() 707 self.assertEqual(checksum, dev_md5) 708 709 def _verify_local(self, checksum, local_path): 710 with open(local_path, 'rb') as host_file: 711 host_md5 = compute_md5(host_file.read()) 712 self.assertEqual(host_md5, checksum) 713 714 def test_push(self): 715 """Push a randomly generated file to specified device.""" 716 kbytes = 512 717 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 718 rand_str = os.urandom(1024 * kbytes) 719 tmp.write(rand_str) 720 tmp.close() 721 722 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 723 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 724 725 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 726 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 727 728 os.remove(tmp.name) 729 730 def test_push_dir(self): 731 """Push a randomly generated directory of files to the device.""" 732 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 733 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 734 735 try: 736 host_dir = tempfile.mkdtemp() 737 738 # Make sure the temp directory isn't setuid, or else adb will complain. 739 os.chmod(host_dir, 0o700) 740 741 # Create 32 random files. 742 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 743 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 744 745 for temp_file in temp_files: 746 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 747 os.path.basename(host_dir), 748 temp_file.base_name) 749 self._verify_remote(temp_file.checksum, remote_path) 750 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 751 finally: 752 if host_dir is not None: 753 shutil.rmtree(host_dir) 754 755 @unittest.expectedFailure # b/25566053 756 def test_push_empty(self): 757 """Push a directory containing an empty directory to the device.""" 758 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 759 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 760 761 try: 762 host_dir = tempfile.mkdtemp() 763 764 # Make sure the temp directory isn't setuid, or else adb will complain. 765 os.chmod(host_dir, 0o700) 766 767 # Create an empty directory. 768 os.mkdir(os.path.join(host_dir, 'empty')) 769 770 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 771 772 test_empty_cmd = ['[', '-d', 773 os.path.join(self.DEVICE_TEMP_DIR, 'empty')] 774 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 775 self.assertEqual(rc, 0) 776 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 777 finally: 778 if host_dir is not None: 779 shutil.rmtree(host_dir) 780 781 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows") 782 def test_push_symlink(self): 783 """Push a symlink. 784 785 Bug: http://b/31491920 786 """ 787 try: 788 host_dir = tempfile.mkdtemp() 789 790 # Make sure the temp directory isn't setuid, or else adb will 791 # complain. 792 os.chmod(host_dir, 0o700) 793 794 with open(os.path.join(host_dir, 'foo'), 'w') as f: 795 f.write('foo') 796 797 symlink_path = os.path.join(host_dir, 'symlink') 798 os.symlink('foo', symlink_path) 799 800 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 801 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 802 self.device.push(symlink_path, self.DEVICE_TEMP_DIR) 803 rc, out, _ = self.device.shell_nocheck( 804 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')]) 805 self.assertEqual(0, rc) 806 self.assertEqual(out.strip(), 'foo') 807 finally: 808 if host_dir is not None: 809 shutil.rmtree(host_dir) 810 811 def test_multiple_push(self): 812 """Push multiple files to the device in one adb push command. 813 814 Bug: http://b/25324823 815 """ 816 817 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 818 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 819 820 try: 821 host_dir = tempfile.mkdtemp() 822 823 # Create some random files and a subdirectory containing more files. 824 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 825 826 subdir = os.path.join(host_dir, 'subdir') 827 os.mkdir(subdir) 828 subdir_temp_files = make_random_host_files(in_dir=subdir, 829 num_files=4) 830 831 paths = map(lambda temp_file: temp_file.full_path, temp_files) 832 paths.append(subdir) 833 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 834 835 for temp_file in temp_files: 836 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 837 temp_file.base_name) 838 self._verify_remote(temp_file.checksum, remote_path) 839 840 for subdir_temp_file in subdir_temp_files: 841 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 842 # BROKEN: http://b/25394682 843 # 'subdir'; 844 temp_file.base_name) 845 self._verify_remote(temp_file.checksum, remote_path) 846 847 848 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 849 finally: 850 if host_dir is not None: 851 shutil.rmtree(host_dir) 852 853 @requires_non_root 854 def test_push_error_reporting(self): 855 """Make sure that errors that occur while pushing a file get reported 856 857 Bug: http://b/26816782 858 """ 859 with tempfile.NamedTemporaryFile() as tmp_file: 860 tmp_file.write('\0' * 1024 * 1024) 861 tmp_file.flush() 862 try: 863 self.device.push(local=tmp_file.name, remote='/system/') 864 self.fail('push should not have succeeded') 865 except subprocess.CalledProcessError as e: 866 output = e.output 867 868 self.assertTrue('Permission denied' in output or 869 'Read-only file system' in output) 870 871 def _test_pull(self, remote_file, checksum): 872 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 873 tmp_write.close() 874 self.device.pull(remote=remote_file, local=tmp_write.name) 875 with open(tmp_write.name, 'rb') as tmp_read: 876 host_contents = tmp_read.read() 877 host_md5 = compute_md5(host_contents) 878 self.assertEqual(checksum, host_md5) 879 os.remove(tmp_write.name) 880 881 @requires_non_root 882 def test_pull_error_reporting(self): 883 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 884 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 885 886 try: 887 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 888 except subprocess.CalledProcessError as e: 889 output = e.output 890 891 self.assertIn('Permission denied', output) 892 893 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 894 895 def test_pull(self): 896 """Pull a randomly generated file from specified device.""" 897 kbytes = 512 898 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 899 cmd = ['dd', 'if=/dev/urandom', 900 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 901 'count={}'.format(kbytes)] 902 self.device.shell(cmd) 903 dev_md5, _ = self.device.shell( 904 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() 905 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 906 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 907 908 def test_pull_dir(self): 909 """Pull a randomly generated directory of files from the device.""" 910 try: 911 host_dir = tempfile.mkdtemp() 912 913 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 914 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 915 916 # Populate device directory with random files. 917 temp_files = make_random_device_files( 918 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 919 920 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 921 922 for temp_file in temp_files: 923 host_path = os.path.join( 924 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 925 temp_file.base_name) 926 self._verify_local(temp_file.checksum, host_path) 927 928 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 929 finally: 930 if host_dir is not None: 931 shutil.rmtree(host_dir) 932 933 def test_pull_dir_symlink(self): 934 """Pull a directory into a symlink to a directory. 935 936 Bug: http://b/27362811 937 """ 938 if os.name != 'posix': 939 raise unittest.SkipTest('requires POSIX') 940 941 try: 942 host_dir = tempfile.mkdtemp() 943 real_dir = os.path.join(host_dir, 'dir') 944 symlink = os.path.join(host_dir, 'symlink') 945 os.mkdir(real_dir) 946 os.symlink(real_dir, symlink) 947 948 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 949 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 950 951 # Populate device directory with random files. 952 temp_files = make_random_device_files( 953 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 954 955 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 956 957 for temp_file in temp_files: 958 host_path = os.path.join( 959 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 960 temp_file.base_name) 961 self._verify_local(temp_file.checksum, host_path) 962 963 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 964 finally: 965 if host_dir is not None: 966 shutil.rmtree(host_dir) 967 968 def test_pull_dir_symlink_collision(self): 969 """Pull a directory into a colliding symlink to directory.""" 970 if os.name != 'posix': 971 raise unittest.SkipTest('requires POSIX') 972 973 try: 974 host_dir = tempfile.mkdtemp() 975 real_dir = os.path.join(host_dir, 'real') 976 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 977 symlink = os.path.join(host_dir, tmp_dirname) 978 os.mkdir(real_dir) 979 os.symlink(real_dir, symlink) 980 981 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 982 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 983 984 # Populate device directory with random files. 985 temp_files = make_random_device_files( 986 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 987 988 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 989 990 for temp_file in temp_files: 991 host_path = os.path.join(real_dir, temp_file.base_name) 992 self._verify_local(temp_file.checksum, host_path) 993 994 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 995 finally: 996 if host_dir is not None: 997 shutil.rmtree(host_dir) 998 999 def test_pull_dir_nonexistent(self): 1000 """Pull a directory of files from the device to a nonexistent path.""" 1001 try: 1002 host_dir = tempfile.mkdtemp() 1003 dest_dir = os.path.join(host_dir, 'dest') 1004 1005 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1006 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1007 1008 # Populate device directory with random files. 1009 temp_files = make_random_device_files( 1010 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1011 1012 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 1013 1014 for temp_file in temp_files: 1015 host_path = os.path.join(dest_dir, temp_file.base_name) 1016 self._verify_local(temp_file.checksum, host_path) 1017 1018 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1019 finally: 1020 if host_dir is not None: 1021 shutil.rmtree(host_dir) 1022 1023 def test_pull_symlink_dir(self): 1024 """Pull a symlink to a directory of symlinks to files.""" 1025 try: 1026 host_dir = tempfile.mkdtemp() 1027 1028 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 1029 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 1030 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 1031 1032 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1033 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 1034 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 1035 1036 # Populate device directory with random files. 1037 temp_files = make_random_device_files( 1038 self.device, in_dir=remote_dir, num_files=32) 1039 1040 for temp_file in temp_files: 1041 self.device.shell( 1042 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 1043 posixpath.join(remote_links, temp_file.base_name)]) 1044 1045 self.device.pull(remote=remote_symlink, local=host_dir) 1046 1047 for temp_file in temp_files: 1048 host_path = os.path.join( 1049 host_dir, 'symlink', temp_file.base_name) 1050 self._verify_local(temp_file.checksum, host_path) 1051 1052 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1053 finally: 1054 if host_dir is not None: 1055 shutil.rmtree(host_dir) 1056 1057 def test_pull_empty(self): 1058 """Pull a directory containing an empty directory from the device.""" 1059 try: 1060 host_dir = tempfile.mkdtemp() 1061 1062 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 1063 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1064 self.device.shell(['mkdir', '-p', remote_empty_path]) 1065 1066 self.device.pull(remote=remote_empty_path, local=host_dir) 1067 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 1068 finally: 1069 if host_dir is not None: 1070 shutil.rmtree(host_dir) 1071 1072 def test_multiple_pull(self): 1073 """Pull a randomly generated directory of files from the device.""" 1074 1075 try: 1076 host_dir = tempfile.mkdtemp() 1077 1078 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 1079 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1080 self.device.shell(['mkdir', '-p', subdir]) 1081 1082 # Create some random files and a subdirectory containing more files. 1083 temp_files = make_random_device_files( 1084 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1085 1086 subdir_temp_files = make_random_device_files( 1087 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1088 1089 paths = map(lambda temp_file: temp_file.full_path, temp_files) 1090 paths.append(subdir) 1091 self.device._simple_call(['pull'] + paths + [host_dir]) 1092 1093 for temp_file in temp_files: 1094 local_path = os.path.join(host_dir, temp_file.base_name) 1095 self._verify_local(temp_file.checksum, local_path) 1096 1097 for subdir_temp_file in subdir_temp_files: 1098 local_path = os.path.join(host_dir, 1099 'subdir', 1100 subdir_temp_file.base_name) 1101 self._verify_local(subdir_temp_file.checksum, local_path) 1102 1103 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1104 finally: 1105 if host_dir is not None: 1106 shutil.rmtree(host_dir) 1107 1108 def verify_sync(self, device, temp_files, device_dir): 1109 """Verifies that a list of temp files was synced to the device.""" 1110 # Confirm that every file on the device mirrors that on the host. 1111 for temp_file in temp_files: 1112 device_full_path = posixpath.join( 1113 device_dir, temp_file.base_name) 1114 dev_md5, _ = device.shell( 1115 [get_md5_prog(self.device), device_full_path])[0].split() 1116 self.assertEqual(temp_file.checksum, dev_md5) 1117 1118 def test_sync(self): 1119 """Sync a host directory to the data partition.""" 1120 1121 try: 1122 base_dir = tempfile.mkdtemp() 1123 1124 # Create mirror device directory hierarchy within base_dir. 1125 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1126 os.makedirs(full_dir_path) 1127 1128 # Create 32 random files within the host mirror. 1129 temp_files = make_random_host_files( 1130 in_dir=full_dir_path, num_files=32) 1131 1132 # Clean up any stale files on the device. 1133 device = adb.get_device() # pylint: disable=no-member 1134 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1135 1136 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT') 1137 os.environ['ANDROID_PRODUCT_OUT'] = base_dir 1138 device.sync('data') 1139 if old_product_out is None: 1140 del os.environ['ANDROID_PRODUCT_OUT'] 1141 else: 1142 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out 1143 1144 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR) 1145 1146 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1147 finally: 1148 if base_dir is not None: 1149 shutil.rmtree(base_dir) 1150 1151 def test_push_sync(self): 1152 """Sync a host directory to a specific path.""" 1153 1154 try: 1155 temp_dir = tempfile.mkdtemp() 1156 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1157 1158 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1159 1160 # Clean up any stale files on the device. 1161 device = adb.get_device() # pylint: disable=no-member 1162 device.shell(['rm', '-rf', device_dir]) 1163 1164 device.push(temp_dir, device_dir, sync=True) 1165 1166 self.verify_sync(device, temp_files, device_dir) 1167 1168 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1169 finally: 1170 if temp_dir is not None: 1171 shutil.rmtree(temp_dir) 1172 1173 def test_unicode_paths(self): 1174 """Ensure that we can support non-ASCII paths, even on Windows.""" 1175 name = u'로보카 폴리' 1176 1177 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1178 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1179 1180 ## push. 1181 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1182 tf.close() 1183 self.device.push(tf.name, remote_path) 1184 os.remove(tf.name) 1185 self.assertFalse(os.path.exists(tf.name)) 1186 1187 # Verify that the device ended up with the expected UTF-8 path 1188 output = self.device.shell( 1189 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1190 self.assertEqual(remote_path.encode('utf-8'), output) 1191 1192 # pull. 1193 self.device.pull(remote_path, tf.name) 1194 self.assertTrue(os.path.exists(tf.name)) 1195 os.remove(tf.name) 1196 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1197 1198 1199class DeviceOfflineTest(DeviceTest): 1200 def _get_device_state(self, serialno): 1201 output = subprocess.check_output(self.device.adb_cmd + ['devices']) 1202 for line in output.split('\n'): 1203 m = re.match('(\S+)\s+(\S+)', line) 1204 if m and m.group(1) == serialno: 1205 return m.group(2) 1206 return None 1207 1208 def disabled_test_killed_when_pushing_a_large_file(self): 1209 """ 1210 While running adb push with a large file, kill adb server. 1211 Occasionally the device becomes offline. Because the device is still 1212 reading data without realizing that the adb server has been restarted. 1213 Test if we can bring the device online automatically now. 1214 http://b/32952319 1215 """ 1216 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1217 # 1. Push a large file 1218 file_path = 'tmp_large_file' 1219 try: 1220 fh = open(file_path, 'w') 1221 fh.write('\0' * (100 * 1024 * 1024)) 1222 fh.close() 1223 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp']) 1224 time.sleep(0.1) 1225 # 2. Kill the adb server 1226 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1227 subproc.terminate() 1228 finally: 1229 try: 1230 os.unlink(file_path) 1231 except: 1232 pass 1233 # 3. See if the device still exist. 1234 # Sleep to wait for the adb server exit. 1235 time.sleep(0.5) 1236 # 4. The device should be online 1237 self.assertEqual(self._get_device_state(serialno), 'device') 1238 1239 def disabled_test_killed_when_pulling_a_large_file(self): 1240 """ 1241 While running adb pull with a large file, kill adb server. 1242 Occasionally the device can't be connected. Because the device is trying to 1243 send a message larger than what is expected by the adb server. 1244 Test if we can bring the device online automatically now. 1245 """ 1246 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1247 file_path = 'tmp_large_file' 1248 try: 1249 # 1. Create a large file on device. 1250 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file', 1251 'bs=1000000', 'count=100']) 1252 # 2. Pull the large file on host. 1253 subproc = subprocess.Popen(self.device.adb_cmd + 1254 ['pull','/data/local/tmp/tmp_large_file', file_path]) 1255 time.sleep(0.1) 1256 # 3. Kill the adb server 1257 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1258 subproc.terminate() 1259 finally: 1260 try: 1261 os.unlink(file_path) 1262 except: 1263 pass 1264 # 4. See if the device still exist. 1265 # Sleep to wait for the adb server exit. 1266 time.sleep(0.5) 1267 self.assertEqual(self._get_device_state(serialno), 'device') 1268 1269 1270 def test_packet_size_regression(self): 1271 """Test for http://b/37783561 1272 1273 Receiving packets of a length divisible by 512 but not 1024 resulted in 1274 the adb client waiting indefinitely for more input. 1275 """ 1276 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n 1277 # Probe some surrounding values as well, for the hell of it. 1278 for base in [512] + range(1024, 1024 * 16, 1024): 1279 for offset in [-6, -5, -4]: 1280 length = base + offset 1281 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;' 1282 'echo', 'foo'] 1283 rc, stdout, _ = self.device.shell_nocheck(cmd) 1284 1285 self.assertEqual(0, rc) 1286 1287 # Output should be '\0' * length, followed by "foo\n" 1288 self.assertEqual(length, len(stdout) - 4) 1289 self.assertEqual(stdout, "\0" * length + "foo\n") 1290 1291 1292def main(): 1293 random.seed(0) 1294 if len(adb.get_devices()) > 0: 1295 suite = unittest.TestLoader().loadTestsFromName(__name__) 1296 unittest.TextTestRunner(verbosity=3).run(suite) 1297 else: 1298 print('Test suite must be run with attached devices') 1299 1300 1301if __name__ == '__main__': 1302 main() 1303