1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2015 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18from __future__ import print_function 19 20import contextlib 21import hashlib 22import os 23import posixpath 24import random 25import re 26import shlex 27import shutil 28import signal 29import socket 30import string 31import subprocess 32import sys 33import tempfile 34import threading 35import time 36import unittest 37 38from datetime import datetime 39 40import adb 41 42def requires_root(func): 43 def wrapper(self, *args): 44 if self.device.get_prop('ro.debuggable') != '1': 45 raise unittest.SkipTest('requires rootable build') 46 47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 48 if not was_root: 49 self.device.root() 50 self.device.wait() 51 52 try: 53 func(self, *args) 54 finally: 55 if not was_root: 56 self.device.unroot() 57 self.device.wait() 58 59 return wrapper 60 61 62def requires_non_root(func): 63 def wrapper(self, *args): 64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 65 if was_root: 66 self.device.unroot() 67 self.device.wait() 68 69 try: 70 func(self, *args) 71 finally: 72 if was_root: 73 self.device.root() 74 self.device.wait() 75 76 return wrapper 77 78 79class DeviceTest(unittest.TestCase): 80 def setUp(self): 81 self.device = adb.get_device() 82 83 84class ForwardReverseTest(DeviceTest): 85 def _test_no_rebind(self, description, direction_list, direction, 86 direction_no_rebind, direction_remove_all): 87 msg = direction_list() 88 self.assertEqual('', msg.strip(), 89 description + ' list must be empty to run this test.') 90 91 # Use --no-rebind with no existing binding 92 direction_no_rebind('tcp:5566', 'tcp:6655') 93 msg = direction_list() 94 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 95 96 # Use --no-rebind with existing binding 97 with self.assertRaises(subprocess.CalledProcessError): 98 direction_no_rebind('tcp:5566', 'tcp:6677') 99 msg = direction_list() 100 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 101 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 102 103 # Use the absence of --no-rebind with existing binding 104 direction('tcp:5566', 'tcp:6677') 105 msg = direction_list() 106 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 107 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 108 109 direction_remove_all() 110 msg = direction_list() 111 self.assertEqual('', msg.strip()) 112 113 def test_forward_no_rebind(self): 114 self._test_no_rebind('forward', self.device.forward_list, 115 self.device.forward, self.device.forward_no_rebind, 116 self.device.forward_remove_all) 117 118 def test_reverse_no_rebind(self): 119 self._test_no_rebind('reverse', self.device.reverse_list, 120 self.device.reverse, self.device.reverse_no_rebind, 121 self.device.reverse_remove_all) 122 123 def test_forward(self): 124 msg = self.device.forward_list() 125 self.assertEqual('', msg.strip(), 126 'Forwarding list must be empty to run this test.') 127 self.device.forward('tcp:5566', 'tcp:6655') 128 msg = self.device.forward_list() 129 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 130 self.device.forward('tcp:7788', 'tcp:8877') 131 msg = self.device.forward_list() 132 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 133 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 134 self.device.forward_remove('tcp:5566') 135 msg = self.device.forward_list() 136 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 137 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 138 self.device.forward_remove_all() 139 msg = self.device.forward_list() 140 self.assertEqual('', msg.strip()) 141 142 def test_forward_old_protocol(self): 143 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 144 145 msg = self.device.forward_list() 146 self.assertEqual('', msg.strip(), 147 'Forwarding list must be empty to run this test.') 148 149 s = socket.create_connection(("localhost", 5037)) 150 service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno 151 cmd = b"%04x%s" % (len(service), service) 152 s.sendall(cmd) 153 154 msg = self.device.forward_list() 155 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 156 157 self.device.forward_remove_all() 158 msg = self.device.forward_list() 159 self.assertEqual('', msg.strip()) 160 161 def test_forward_tcp_port_0(self): 162 self.assertEqual('', self.device.forward_list().strip(), 163 'Forwarding list must be empty to run this test.') 164 165 try: 166 # If resolving TCP port 0 is supported, `adb forward` will print 167 # the actual port number. 168 port = self.device.forward('tcp:0', 'tcp:8888').strip() 169 if not port: 170 raise unittest.SkipTest('Forwarding tcp:0 is not available.') 171 172 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 173 self.device.forward_list())) 174 finally: 175 self.device.forward_remove_all() 176 177 def test_reverse(self): 178 msg = self.device.reverse_list() 179 self.assertEqual('', msg.strip(), 180 'Reverse forwarding list must be empty to run this test.') 181 self.device.reverse('tcp:5566', 'tcp:6655') 182 msg = self.device.reverse_list() 183 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 184 self.device.reverse('tcp:7788', 'tcp:8877') 185 msg = self.device.reverse_list() 186 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 187 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 188 self.device.reverse_remove('tcp:5566') 189 msg = self.device.reverse_list() 190 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 191 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 192 self.device.reverse_remove_all() 193 msg = self.device.reverse_list() 194 self.assertEqual('', msg.strip()) 195 196 def test_reverse_tcp_port_0(self): 197 self.assertEqual('', self.device.reverse_list().strip(), 198 'Reverse list must be empty to run this test.') 199 200 try: 201 # If resolving TCP port 0 is supported, `adb reverse` will print 202 # the actual port number. 203 port = self.device.reverse('tcp:0', 'tcp:8888').strip() 204 if not port: 205 raise unittest.SkipTest('Reversing tcp:0 is not available.') 206 207 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 208 self.device.reverse_list())) 209 finally: 210 self.device.reverse_remove_all() 211 212 def test_forward_reverse_echo(self): 213 """Send data through adb forward and read it back via adb reverse""" 214 forward_port = 12345 215 reverse_port = forward_port + 1 216 forward_spec = 'tcp:' + str(forward_port) 217 reverse_spec = 'tcp:' + str(reverse_port) 218 forward_setup = False 219 reverse_setup = False 220 221 try: 222 # listen on localhost:forward_port, connect to remote:forward_port 223 self.device.forward(forward_spec, forward_spec) 224 forward_setup = True 225 # listen on remote:forward_port, connect to localhost:reverse_port 226 self.device.reverse(forward_spec, reverse_spec) 227 reverse_setup = True 228 229 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 230 with contextlib.closing(listener): 231 # Use SO_REUSEADDR so that subsequent runs of the test can grab 232 # the port even if it is in TIME_WAIT. 233 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 234 235 # Listen on localhost:reverse_port before connecting to 236 # localhost:forward_port because that will cause adb to connect 237 # back to localhost:reverse_port. 238 listener.bind(('127.0.0.1', reverse_port)) 239 listener.listen(4) 240 241 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 242 with contextlib.closing(client): 243 # Connect to the listener. 244 client.connect(('127.0.0.1', forward_port)) 245 246 # Accept the client connection. 247 accepted_connection, addr = listener.accept() 248 with contextlib.closing(accepted_connection) as server: 249 data = 'hello' 250 251 # Send data into the port setup by adb forward. 252 client.sendall(data) 253 # Explicitly close() so that server gets EOF. 254 client.close() 255 256 # Verify that the data came back via adb reverse. 257 self.assertEqual(data, server.makefile().read()) 258 finally: 259 if reverse_setup: 260 self.device.reverse_remove(forward_spec) 261 if forward_setup: 262 self.device.forward_remove(forward_spec) 263 264 265class ShellTest(DeviceTest): 266 def _interactive_shell(self, shell_args, input): 267 """Runs an interactive adb shell. 268 269 Args: 270 shell_args: List of string arguments to `adb shell`. 271 input: String input to send to the interactive shell. 272 273 Returns: 274 The remote exit code. 275 276 Raises: 277 unittest.SkipTest: The device doesn't support exit codes. 278 """ 279 if not self.device.has_shell_protocol(): 280 raise unittest.SkipTest('exit codes are unavailable on this device') 281 282 proc = subprocess.Popen( 283 self.device.adb_cmd + ['shell'] + shell_args, 284 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 285 stderr=subprocess.PIPE) 286 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 287 # to explicitly add an exit command to close the session from the device 288 # side, plus the necessary newline to complete the interactive command. 289 proc.communicate(input + '; exit\n') 290 return proc.returncode 291 292 def test_cat(self): 293 """Check that we can at least cat a file.""" 294 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 295 elements = out.split() 296 self.assertEqual(len(elements), 2) 297 298 uptime, idle = elements 299 self.assertGreater(float(uptime), 0.0) 300 self.assertGreater(float(idle), 0.0) 301 302 def test_throws_on_failure(self): 303 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 304 305 def test_output_not_stripped(self): 306 out = self.device.shell(['echo', 'foo'])[0] 307 self.assertEqual(out, 'foo' + self.device.linesep) 308 309 def test_shell_command_length(self): 310 # Devices that have shell_v2 should be able to handle long commands. 311 if self.device.has_shell_protocol(): 312 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384]) 313 self.assertEqual(rc, 0) 314 self.assertTrue(out == ('x' * 16384 + '\n')) 315 316 def test_shell_nocheck_failure(self): 317 rc, out, _ = self.device.shell_nocheck(['false']) 318 self.assertNotEqual(rc, 0) 319 self.assertEqual(out, '') 320 321 def test_shell_nocheck_output_not_stripped(self): 322 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 323 self.assertEqual(rc, 0) 324 self.assertEqual(out, 'foo' + self.device.linesep) 325 326 def test_can_distinguish_tricky_results(self): 327 # If result checking on ADB shell is naively implemented as 328 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 329 # output from the result for a cmd of `echo -n 1`. 330 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 331 self.assertEqual(rc, 0) 332 self.assertEqual(out, '1') 333 334 def test_line_endings(self): 335 """Ensure that line ending translation is not happening in the pty. 336 337 Bug: http://b/19735063 338 """ 339 output = self.device.shell(['uname'])[0] 340 self.assertEqual(output, 'Linux' + self.device.linesep) 341 342 def test_pty_logic(self): 343 """Tests that a PTY is allocated when it should be. 344 345 PTY allocation behavior should match ssh. 346 """ 347 def check_pty(args): 348 """Checks adb shell PTY allocation. 349 350 Tests |args| for terminal and non-terminal stdin. 351 352 Args: 353 args: -Tt args in a list (e.g. ['-t', '-t']). 354 355 Returns: 356 A tuple (<terminal>, <non-terminal>). True indicates 357 the corresponding shell allocated a remote PTY. 358 """ 359 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 360 361 terminal = subprocess.Popen( 362 test_cmd, stdin=None, 363 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 364 terminal.communicate() 365 366 non_terminal = subprocess.Popen( 367 test_cmd, stdin=subprocess.PIPE, 368 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 369 non_terminal.communicate() 370 371 return (terminal.returncode == 0, non_terminal.returncode == 0) 372 373 # -T: never allocate PTY. 374 self.assertEqual((False, False), check_pty(['-T'])) 375 376 # These tests require a new device. 377 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()): 378 # No args: PTY only if stdin is a terminal and shell is interactive, 379 # which is difficult to reliably test from a script. 380 self.assertEqual((False, False), check_pty([])) 381 382 # -t: PTY if stdin is a terminal. 383 self.assertEqual((True, False), check_pty(['-t'])) 384 385 # -t -t: always allocate PTY. 386 self.assertEqual((True, True), check_pty(['-t', '-t'])) 387 388 # -tt: always allocate PTY, POSIX style (http://b/32216152). 389 self.assertEqual((True, True), check_pty(['-tt'])) 390 391 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but 392 # we follow the man page instead. 393 self.assertEqual((True, True), check_pty(['-ttt'])) 394 395 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error). 396 self.assertEqual((True, True), check_pty(['-ttx'])) 397 398 # -Ttt: -tt cancels out -T. 399 self.assertEqual((True, True), check_pty(['-Ttt'])) 400 401 # -ttT: -T cancels out -tt. 402 self.assertEqual((False, False), check_pty(['-ttT'])) 403 404 def test_shell_protocol(self): 405 """Tests the shell protocol on the device. 406 407 If the device supports shell protocol, this gives us the ability 408 to separate stdout/stderr and return the exit code directly. 409 410 Bug: http://b/19734861 411 """ 412 if not self.device.has_shell_protocol(): 413 raise unittest.SkipTest('shell protocol unsupported on this device') 414 415 # Shell protocol should be used by default. 416 result = self.device.shell_nocheck( 417 shlex.split('echo foo; echo bar >&2; exit 17')) 418 self.assertEqual(17, result[0]) 419 self.assertEqual('foo' + self.device.linesep, result[1]) 420 self.assertEqual('bar' + self.device.linesep, result[2]) 421 422 self.assertEqual(17, self._interactive_shell([], 'exit 17')) 423 424 # -x flag should disable shell protocol. 425 result = self.device.shell_nocheck( 426 shlex.split('-x echo foo; echo bar >&2; exit 17')) 427 self.assertEqual(0, result[0]) 428 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 429 self.assertEqual('', result[2]) 430 431 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17')) 432 433 def test_non_interactive_sigint(self): 434 """Tests that SIGINT in a non-interactive shell kills the process. 435 436 This requires the shell protocol in order to detect the broken 437 pipe; raw data transfer mode will only see the break once the 438 subprocess tries to read or write. 439 440 Bug: http://b/23825725 441 """ 442 if not self.device.has_shell_protocol(): 443 raise unittest.SkipTest('shell protocol unsupported on this device') 444 445 # Start a long-running process. 446 sleep_proc = subprocess.Popen( 447 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 448 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 449 stderr=subprocess.STDOUT) 450 remote_pid = sleep_proc.stdout.readline().strip() 451 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 452 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 453 454 # Verify that the process is running, send signal, verify it stopped. 455 self.device.shell(proc_query) 456 os.kill(sleep_proc.pid, signal.SIGINT) 457 sleep_proc.communicate() 458 459 # It can take some time for the process to receive the signal and die. 460 end_time = time.time() + 3 461 while self.device.shell_nocheck(proc_query)[0] != 1: 462 self.assertFalse(time.time() > end_time, 463 'subprocess failed to terminate in time') 464 465 def test_non_interactive_stdin(self): 466 """Tests that non-interactive shells send stdin.""" 467 if not self.device.has_shell_protocol(): 468 raise unittest.SkipTest('non-interactive stdin unsupported ' 469 'on this device') 470 471 # Test both small and large inputs. 472 small_input = 'foo' 473 large_input = '\n'.join(c * 100 for c in (string.ascii_letters + 474 string.digits)) 475 476 for input in (small_input, large_input): 477 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 478 stdin=subprocess.PIPE, 479 stdout=subprocess.PIPE, 480 stderr=subprocess.PIPE) 481 stdout, stderr = proc.communicate(input) 482 self.assertEqual(input.splitlines(), stdout.splitlines()) 483 self.assertEqual('', stderr) 484 485 def test_sighup(self): 486 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c""" 487 log_path = "/data/local/tmp/adb_signal_test.log" 488 489 # Clear the output file. 490 self.device.shell_nocheck(["echo", ">", log_path]) 491 492 script = """ 493 trap "echo SIGINT > {path}; exit 0" SIGINT 494 trap "echo SIGHUP > {path}; exit 0" SIGHUP 495 echo Waiting 496 read 497 """.format(path=log_path) 498 499 script = ";".join([x.strip() for x in script.strip().splitlines()]) 500 501 process = self.device.shell_popen([script], kill_atexit=False, 502 stdin=subprocess.PIPE, 503 stdout=subprocess.PIPE) 504 505 self.assertEqual("Waiting\n", process.stdout.readline()) 506 process.send_signal(signal.SIGINT) 507 process.wait() 508 509 # Waiting for the local adb to finish is insufficient, since it hangs 510 # up immediately. 511 time.sleep(1) 512 513 stdout, _ = self.device.shell(["cat", log_path]) 514 self.assertEqual(stdout.strip(), "SIGHUP") 515 516 def test_exit_stress(self): 517 """Hammer `adb shell exit 42` with multiple threads.""" 518 thread_count = 48 519 result = dict() 520 def hammer(thread_idx, thread_count, result): 521 success = True 522 for i in range(thread_idx, 240, thread_count): 523 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)]) 524 if ret != i % 256: 525 success = False 526 break 527 result[thread_idx] = success 528 529 threads = [] 530 for i in range(thread_count): 531 thread = threading.Thread(target=hammer, args=(i, thread_count, result)) 532 thread.start() 533 threads.append(thread) 534 for thread in threads: 535 thread.join() 536 for i, success in result.iteritems(): 537 self.assertTrue(success) 538 539 540class ArgumentEscapingTest(DeviceTest): 541 def test_shell_escaping(self): 542 """Make sure that argument escaping is somewhat sane.""" 543 544 # http://b/19734868 545 # Note that this actually matches ssh(1)'s behavior --- it's 546 # converted to `sh -c echo hello; echo world` which sh interprets 547 # as `sh -c echo` (with an argument to that shell of "hello"), 548 # and then `echo world` back in the first shell. 549 result = self.device.shell( 550 shlex.split("sh -c 'echo hello; echo world'"))[0] 551 result = result.splitlines() 552 self.assertEqual(['', 'world'], result) 553 # If you really wanted "hello" and "world", here's what you'd do: 554 result = self.device.shell( 555 shlex.split(r'echo hello\;echo world'))[0].splitlines() 556 self.assertEqual(['hello', 'world'], result) 557 558 # http://b/15479704 559 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 560 self.assertEqual('t', result) 561 result = self.device.shell( 562 shlex.split("sh -c 'true && echo t'"))[0].strip() 563 self.assertEqual('t', result) 564 565 # http://b/20564385 566 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 567 self.assertEqual('t', result) 568 result = self.device.shell( 569 shlex.split(r'echo -n 123\;uname'))[0].strip() 570 self.assertEqual('123Linux', result) 571 572 def test_install_argument_escaping(self): 573 """Make sure that install argument escaping works.""" 574 # http://b/20323053, http://b/3090932. 575 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"): 576 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 577 delete=False) 578 tf.close() 579 580 # Installing bogus .apks fails if the device supports exit codes. 581 try: 582 output = self.device.install(tf.name) 583 except subprocess.CalledProcessError as e: 584 output = e.output 585 586 self.assertIn(file_suffix, output) 587 os.remove(tf.name) 588 589 590class RootUnrootTest(DeviceTest): 591 def _test_root(self): 592 message = self.device.root() 593 if 'adbd cannot run as root in production builds' in message: 594 return 595 self.device.wait() 596 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 597 598 def _test_unroot(self): 599 self.device.unroot() 600 self.device.wait() 601 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 602 603 def test_root_unroot(self): 604 """Make sure that adb root and adb unroot work, using id(1).""" 605 if self.device.get_prop('ro.debuggable') != '1': 606 raise unittest.SkipTest('requires rootable build') 607 608 original_user = self.device.shell(['id', '-un'])[0].strip() 609 try: 610 if original_user == 'root': 611 self._test_unroot() 612 self._test_root() 613 elif original_user == 'shell': 614 self._test_root() 615 self._test_unroot() 616 finally: 617 if original_user == 'root': 618 self.device.root() 619 else: 620 self.device.unroot() 621 self.device.wait() 622 623 624class TcpIpTest(DeviceTest): 625 def test_tcpip_failure_raises(self): 626 """adb tcpip requires a port. 627 628 Bug: http://b/22636927 629 """ 630 self.assertRaises( 631 subprocess.CalledProcessError, self.device.tcpip, '') 632 self.assertRaises( 633 subprocess.CalledProcessError, self.device.tcpip, 'foo') 634 635 636class SystemPropertiesTest(DeviceTest): 637 def test_get_prop(self): 638 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 639 640 @requires_root 641 def test_set_prop(self): 642 prop_name = 'foo.bar' 643 self.device.shell(['setprop', prop_name, '""']) 644 645 self.device.set_prop(prop_name, 'qux') 646 self.assertEqual( 647 self.device.shell(['getprop', prop_name])[0].strip(), 'qux') 648 649 650def compute_md5(string): 651 hsh = hashlib.md5() 652 hsh.update(string) 653 return hsh.hexdigest() 654 655 656def get_md5_prog(device): 657 """Older platforms (pre-L) had the name md5 rather than md5sum.""" 658 try: 659 device.shell(['md5sum', '/proc/uptime']) 660 return 'md5sum' 661 except adb.ShellError: 662 return 'md5' 663 664 665class HostFile(object): 666 def __init__(self, handle, checksum): 667 self.handle = handle 668 self.checksum = checksum 669 self.full_path = handle.name 670 self.base_name = os.path.basename(self.full_path) 671 672 673class DeviceFile(object): 674 def __init__(self, checksum, full_path): 675 self.checksum = checksum 676 self.full_path = full_path 677 self.base_name = posixpath.basename(self.full_path) 678 679 680def make_random_host_files(in_dir, num_files): 681 min_size = 1 * (1 << 10) 682 max_size = 16 * (1 << 10) 683 684 files = [] 685 for _ in xrange(num_files): 686 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 687 688 size = random.randrange(min_size, max_size, 1024) 689 rand_str = os.urandom(size) 690 file_handle.write(rand_str) 691 file_handle.flush() 692 file_handle.close() 693 694 md5 = compute_md5(rand_str) 695 files.append(HostFile(file_handle, md5)) 696 return files 697 698 699def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 700 min_size = 1 * (1 << 10) 701 max_size = 16 * (1 << 10) 702 703 files = [] 704 for file_num in xrange(num_files): 705 size = random.randrange(min_size, max_size, 1024) 706 707 base_name = prefix + str(file_num) 708 full_path = posixpath.join(in_dir, base_name) 709 710 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 711 'bs={}'.format(size), 'count=1']) 712 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() 713 714 files.append(DeviceFile(dev_md5, full_path)) 715 return files 716 717 718class FileOperationsTest(DeviceTest): 719 SCRATCH_DIR = '/data/local/tmp' 720 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 721 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 722 723 def _verify_remote(self, checksum, remote_path): 724 dev_md5, _ = self.device.shell([get_md5_prog(self.device), 725 remote_path])[0].split() 726 self.assertEqual(checksum, dev_md5) 727 728 def _verify_local(self, checksum, local_path): 729 with open(local_path, 'rb') as host_file: 730 host_md5 = compute_md5(host_file.read()) 731 self.assertEqual(host_md5, checksum) 732 733 def test_push(self): 734 """Push a randomly generated file to specified device.""" 735 kbytes = 512 736 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 737 rand_str = os.urandom(1024 * kbytes) 738 tmp.write(rand_str) 739 tmp.close() 740 741 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 742 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 743 744 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 745 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 746 747 os.remove(tmp.name) 748 749 def test_push_dir(self): 750 """Push a randomly generated directory of files to the device.""" 751 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 752 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 753 754 try: 755 host_dir = tempfile.mkdtemp() 756 757 # Make sure the temp directory isn't setuid, or else adb will complain. 758 os.chmod(host_dir, 0o700) 759 760 # Create 32 random files. 761 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 762 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 763 764 for temp_file in temp_files: 765 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 766 os.path.basename(host_dir), 767 temp_file.base_name) 768 self._verify_remote(temp_file.checksum, remote_path) 769 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 770 finally: 771 if host_dir is not None: 772 shutil.rmtree(host_dir) 773 774 def disabled_test_push_empty(self): 775 """Push an empty directory to the device.""" 776 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 777 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 778 779 try: 780 host_dir = tempfile.mkdtemp() 781 782 # Make sure the temp directory isn't setuid, or else adb will complain. 783 os.chmod(host_dir, 0o700) 784 785 # Create an empty directory. 786 empty_dir_path = os.path.join(host_dir, 'empty') 787 os.mkdir(empty_dir_path); 788 789 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR) 790 791 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty") 792 test_empty_cmd = ["[", "-d", remote_path, "]"] 793 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 794 795 self.assertEqual(rc, 0) 796 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 797 finally: 798 if host_dir is not None: 799 shutil.rmtree(host_dir) 800 801 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows") 802 def test_push_symlink(self): 803 """Push a symlink. 804 805 Bug: http://b/31491920 806 """ 807 try: 808 host_dir = tempfile.mkdtemp() 809 810 # Make sure the temp directory isn't setuid, or else adb will 811 # complain. 812 os.chmod(host_dir, 0o700) 813 814 with open(os.path.join(host_dir, 'foo'), 'w') as f: 815 f.write('foo') 816 817 symlink_path = os.path.join(host_dir, 'symlink') 818 os.symlink('foo', symlink_path) 819 820 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 821 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 822 self.device.push(symlink_path, self.DEVICE_TEMP_DIR) 823 rc, out, _ = self.device.shell_nocheck( 824 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')]) 825 self.assertEqual(0, rc) 826 self.assertEqual(out.strip(), 'foo') 827 finally: 828 if host_dir is not None: 829 shutil.rmtree(host_dir) 830 831 def test_multiple_push(self): 832 """Push multiple files to the device in one adb push command. 833 834 Bug: http://b/25324823 835 """ 836 837 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 838 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 839 840 try: 841 host_dir = tempfile.mkdtemp() 842 843 # Create some random files and a subdirectory containing more files. 844 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 845 846 subdir = os.path.join(host_dir, 'subdir') 847 os.mkdir(subdir) 848 subdir_temp_files = make_random_host_files(in_dir=subdir, 849 num_files=4) 850 851 paths = map(lambda temp_file: temp_file.full_path, temp_files) 852 paths.append(subdir) 853 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 854 855 for temp_file in temp_files: 856 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 857 temp_file.base_name) 858 self._verify_remote(temp_file.checksum, remote_path) 859 860 for subdir_temp_file in subdir_temp_files: 861 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 862 # BROKEN: http://b/25394682 863 # 'subdir'; 864 temp_file.base_name) 865 self._verify_remote(temp_file.checksum, remote_path) 866 867 868 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 869 finally: 870 if host_dir is not None: 871 shutil.rmtree(host_dir) 872 873 @requires_non_root 874 def test_push_error_reporting(self): 875 """Make sure that errors that occur while pushing a file get reported 876 877 Bug: http://b/26816782 878 """ 879 with tempfile.NamedTemporaryFile() as tmp_file: 880 tmp_file.write('\0' * 1024 * 1024) 881 tmp_file.flush() 882 try: 883 self.device.push(local=tmp_file.name, remote='/system/') 884 self.fail('push should not have succeeded') 885 except subprocess.CalledProcessError as e: 886 output = e.output 887 888 self.assertTrue('Permission denied' in output or 889 'Read-only file system' in output) 890 891 @requires_non_root 892 def test_push_directory_creation(self): 893 """Regression test for directory creation. 894 895 Bug: http://b/110953234 896 """ 897 with tempfile.NamedTemporaryFile() as tmp_file: 898 tmp_file.write('\0' * 1024 * 1024) 899 tmp_file.flush() 900 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation' 901 self.device.shell(['rm', '-rf', remote_path]) 902 903 remote_path += '/filename' 904 self.device.push(local=tmp_file.name, remote=remote_path) 905 906 def disabled_test_push_multiple_slash_root(self): 907 """Regression test for pushing to //data/local/tmp. 908 909 Bug: http://b/141311284 910 911 Disabled because this broken on the adbd side as well: b/141943968 912 """ 913 with tempfile.NamedTemporaryFile() as tmp_file: 914 tmp_file.write('\0' * 1024 * 1024) 915 tmp_file.flush() 916 remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root' 917 self.device.shell(['rm', '-rf', remote_path]) 918 self.device.push(local=tmp_file.name, remote=remote_path) 919 920 def _test_pull(self, remote_file, checksum): 921 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 922 tmp_write.close() 923 self.device.pull(remote=remote_file, local=tmp_write.name) 924 with open(tmp_write.name, 'rb') as tmp_read: 925 host_contents = tmp_read.read() 926 host_md5 = compute_md5(host_contents) 927 self.assertEqual(checksum, host_md5) 928 os.remove(tmp_write.name) 929 930 @requires_non_root 931 def test_pull_error_reporting(self): 932 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 933 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 934 935 try: 936 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 937 except subprocess.CalledProcessError as e: 938 output = e.output 939 940 self.assertIn('Permission denied', output) 941 942 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 943 944 def test_pull(self): 945 """Pull a randomly generated file from specified device.""" 946 kbytes = 512 947 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 948 cmd = ['dd', 'if=/dev/urandom', 949 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 950 'count={}'.format(kbytes)] 951 self.device.shell(cmd) 952 dev_md5, _ = self.device.shell( 953 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() 954 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 955 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 956 957 def test_pull_dir(self): 958 """Pull a randomly generated directory of files from the device.""" 959 try: 960 host_dir = tempfile.mkdtemp() 961 962 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 963 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 964 965 # Populate device directory with random files. 966 temp_files = make_random_device_files( 967 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 968 969 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 970 971 for temp_file in temp_files: 972 host_path = os.path.join( 973 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 974 temp_file.base_name) 975 self._verify_local(temp_file.checksum, host_path) 976 977 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 978 finally: 979 if host_dir is not None: 980 shutil.rmtree(host_dir) 981 982 def test_pull_dir_symlink(self): 983 """Pull a directory into a symlink to a directory. 984 985 Bug: http://b/27362811 986 """ 987 if os.name != 'posix': 988 raise unittest.SkipTest('requires POSIX') 989 990 try: 991 host_dir = tempfile.mkdtemp() 992 real_dir = os.path.join(host_dir, 'dir') 993 symlink = os.path.join(host_dir, 'symlink') 994 os.mkdir(real_dir) 995 os.symlink(real_dir, symlink) 996 997 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 998 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 999 1000 # Populate device directory with random files. 1001 temp_files = make_random_device_files( 1002 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1003 1004 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 1005 1006 for temp_file in temp_files: 1007 host_path = os.path.join( 1008 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1009 temp_file.base_name) 1010 self._verify_local(temp_file.checksum, host_path) 1011 1012 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1013 finally: 1014 if host_dir is not None: 1015 shutil.rmtree(host_dir) 1016 1017 def test_pull_dir_symlink_collision(self): 1018 """Pull a directory into a colliding symlink to directory.""" 1019 if os.name != 'posix': 1020 raise unittest.SkipTest('requires POSIX') 1021 1022 try: 1023 host_dir = tempfile.mkdtemp() 1024 real_dir = os.path.join(host_dir, 'real') 1025 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 1026 symlink = os.path.join(host_dir, tmp_dirname) 1027 os.mkdir(real_dir) 1028 os.symlink(real_dir, symlink) 1029 1030 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1031 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1032 1033 # Populate device directory with random files. 1034 temp_files = make_random_device_files( 1035 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1036 1037 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1038 1039 for temp_file in temp_files: 1040 host_path = os.path.join(real_dir, temp_file.base_name) 1041 self._verify_local(temp_file.checksum, host_path) 1042 1043 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1044 finally: 1045 if host_dir is not None: 1046 shutil.rmtree(host_dir) 1047 1048 def test_pull_dir_nonexistent(self): 1049 """Pull a directory of files from the device to a nonexistent path.""" 1050 try: 1051 host_dir = tempfile.mkdtemp() 1052 dest_dir = os.path.join(host_dir, 'dest') 1053 1054 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1055 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1056 1057 # Populate device directory with random files. 1058 temp_files = make_random_device_files( 1059 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1060 1061 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 1062 1063 for temp_file in temp_files: 1064 host_path = os.path.join(dest_dir, temp_file.base_name) 1065 self._verify_local(temp_file.checksum, host_path) 1066 1067 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1068 finally: 1069 if host_dir is not None: 1070 shutil.rmtree(host_dir) 1071 1072 # selinux prevents adbd from accessing symlinks on /data/local/tmp. 1073 def disabled_test_pull_symlink_dir(self): 1074 """Pull a symlink to a directory of symlinks to files.""" 1075 try: 1076 host_dir = tempfile.mkdtemp() 1077 1078 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 1079 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 1080 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 1081 1082 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1083 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 1084 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 1085 1086 # Populate device directory with random files. 1087 temp_files = make_random_device_files( 1088 self.device, in_dir=remote_dir, num_files=32) 1089 1090 for temp_file in temp_files: 1091 self.device.shell( 1092 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 1093 posixpath.join(remote_links, temp_file.base_name)]) 1094 1095 self.device.pull(remote=remote_symlink, local=host_dir) 1096 1097 for temp_file in temp_files: 1098 host_path = os.path.join( 1099 host_dir, 'symlink', temp_file.base_name) 1100 self._verify_local(temp_file.checksum, host_path) 1101 1102 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1103 finally: 1104 if host_dir is not None: 1105 shutil.rmtree(host_dir) 1106 1107 def test_pull_empty(self): 1108 """Pull a directory containing an empty directory from the device.""" 1109 try: 1110 host_dir = tempfile.mkdtemp() 1111 1112 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 1113 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1114 self.device.shell(['mkdir', '-p', remote_empty_path]) 1115 1116 self.device.pull(remote=remote_empty_path, local=host_dir) 1117 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 1118 finally: 1119 if host_dir is not None: 1120 shutil.rmtree(host_dir) 1121 1122 def test_multiple_pull(self): 1123 """Pull a randomly generated directory of files from the device.""" 1124 1125 try: 1126 host_dir = tempfile.mkdtemp() 1127 1128 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 1129 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1130 self.device.shell(['mkdir', '-p', subdir]) 1131 1132 # Create some random files and a subdirectory containing more files. 1133 temp_files = make_random_device_files( 1134 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1135 1136 subdir_temp_files = make_random_device_files( 1137 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1138 1139 paths = map(lambda temp_file: temp_file.full_path, temp_files) 1140 paths.append(subdir) 1141 self.device._simple_call(['pull'] + paths + [host_dir]) 1142 1143 for temp_file in temp_files: 1144 local_path = os.path.join(host_dir, temp_file.base_name) 1145 self._verify_local(temp_file.checksum, local_path) 1146 1147 for subdir_temp_file in subdir_temp_files: 1148 local_path = os.path.join(host_dir, 1149 'subdir', 1150 subdir_temp_file.base_name) 1151 self._verify_local(subdir_temp_file.checksum, local_path) 1152 1153 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1154 finally: 1155 if host_dir is not None: 1156 shutil.rmtree(host_dir) 1157 1158 def verify_sync(self, device, temp_files, device_dir): 1159 """Verifies that a list of temp files was synced to the device.""" 1160 # Confirm that every file on the device mirrors that on the host. 1161 for temp_file in temp_files: 1162 device_full_path = posixpath.join( 1163 device_dir, temp_file.base_name) 1164 dev_md5, _ = device.shell( 1165 [get_md5_prog(self.device), device_full_path])[0].split() 1166 self.assertEqual(temp_file.checksum, dev_md5) 1167 1168 def test_sync(self): 1169 """Sync a host directory to the data partition.""" 1170 1171 try: 1172 base_dir = tempfile.mkdtemp() 1173 1174 # Create mirror device directory hierarchy within base_dir. 1175 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1176 os.makedirs(full_dir_path) 1177 1178 # Create 32 random files within the host mirror. 1179 temp_files = make_random_host_files( 1180 in_dir=full_dir_path, num_files=32) 1181 1182 # Clean up any stale files on the device. 1183 device = adb.get_device() # pylint: disable=no-member 1184 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1185 1186 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT') 1187 os.environ['ANDROID_PRODUCT_OUT'] = base_dir 1188 device.sync('data') 1189 if old_product_out is None: 1190 del os.environ['ANDROID_PRODUCT_OUT'] 1191 else: 1192 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out 1193 1194 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR) 1195 1196 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1197 finally: 1198 if base_dir is not None: 1199 shutil.rmtree(base_dir) 1200 1201 def test_push_sync(self): 1202 """Sync a host directory to a specific path.""" 1203 1204 try: 1205 temp_dir = tempfile.mkdtemp() 1206 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1207 1208 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1209 1210 # Clean up any stale files on the device. 1211 device = adb.get_device() # pylint: disable=no-member 1212 device.shell(['rm', '-rf', device_dir]) 1213 1214 device.push(temp_dir, device_dir, sync=True) 1215 1216 self.verify_sync(device, temp_files, device_dir) 1217 1218 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1219 finally: 1220 if temp_dir is not None: 1221 shutil.rmtree(temp_dir) 1222 1223 def test_unicode_paths(self): 1224 """Ensure that we can support non-ASCII paths, even on Windows.""" 1225 name = u'로보카 폴리' 1226 1227 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1228 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1229 1230 ## push. 1231 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1232 tf.close() 1233 self.device.push(tf.name, remote_path) 1234 os.remove(tf.name) 1235 self.assertFalse(os.path.exists(tf.name)) 1236 1237 # Verify that the device ended up with the expected UTF-8 path 1238 output = self.device.shell( 1239 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1240 self.assertEqual(remote_path, output) 1241 1242 # pull. 1243 self.device.pull(remote_path, tf.name) 1244 self.assertTrue(os.path.exists(tf.name)) 1245 os.remove(tf.name) 1246 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1247 1248 1249class DeviceOfflineTest(DeviceTest): 1250 def _get_device_state(self, serialno): 1251 output = subprocess.check_output(self.device.adb_cmd + ['devices']) 1252 for line in output.split('\n'): 1253 m = re.match('(\S+)\s+(\S+)', line) 1254 if m and m.group(1) == serialno: 1255 return m.group(2) 1256 return None 1257 1258 def disabled_test_killed_when_pushing_a_large_file(self): 1259 """ 1260 While running adb push with a large file, kill adb server. 1261 Occasionally the device becomes offline. Because the device is still 1262 reading data without realizing that the adb server has been restarted. 1263 Test if we can bring the device online automatically now. 1264 http://b/32952319 1265 """ 1266 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1267 # 1. Push a large file 1268 file_path = 'tmp_large_file' 1269 try: 1270 fh = open(file_path, 'w') 1271 fh.write('\0' * (100 * 1024 * 1024)) 1272 fh.close() 1273 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp']) 1274 time.sleep(0.1) 1275 # 2. Kill the adb server 1276 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1277 subproc.terminate() 1278 finally: 1279 try: 1280 os.unlink(file_path) 1281 except: 1282 pass 1283 # 3. See if the device still exist. 1284 # Sleep to wait for the adb server exit. 1285 time.sleep(0.5) 1286 # 4. The device should be online 1287 self.assertEqual(self._get_device_state(serialno), 'device') 1288 1289 def disabled_test_killed_when_pulling_a_large_file(self): 1290 """ 1291 While running adb pull with a large file, kill adb server. 1292 Occasionally the device can't be connected. Because the device is trying to 1293 send a message larger than what is expected by the adb server. 1294 Test if we can bring the device online automatically now. 1295 """ 1296 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1297 file_path = 'tmp_large_file' 1298 try: 1299 # 1. Create a large file on device. 1300 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file', 1301 'bs=1000000', 'count=100']) 1302 # 2. Pull the large file on host. 1303 subproc = subprocess.Popen(self.device.adb_cmd + 1304 ['pull','/data/local/tmp/tmp_large_file', file_path]) 1305 time.sleep(0.1) 1306 # 3. Kill the adb server 1307 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1308 subproc.terminate() 1309 finally: 1310 try: 1311 os.unlink(file_path) 1312 except: 1313 pass 1314 # 4. See if the device still exist. 1315 # Sleep to wait for the adb server exit. 1316 time.sleep(0.5) 1317 self.assertEqual(self._get_device_state(serialno), 'device') 1318 1319 1320 def test_packet_size_regression(self): 1321 """Test for http://b/37783561 1322 1323 Receiving packets of a length divisible by 512 but not 1024 resulted in 1324 the adb client waiting indefinitely for more input. 1325 """ 1326 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n 1327 # Probe some surrounding values as well, for the hell of it. 1328 for base in [512] + range(1024, 1024 * 16, 1024): 1329 for offset in [-6, -5, -4]: 1330 length = base + offset 1331 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;' 1332 'echo', 'foo'] 1333 rc, stdout, _ = self.device.shell_nocheck(cmd) 1334 1335 self.assertEqual(0, rc) 1336 1337 # Output should be '\0' * length, followed by "foo\n" 1338 self.assertEqual(length, len(stdout) - 4) 1339 self.assertEqual(stdout, "\0" * length + "foo\n") 1340 1341 def test_zero_packet(self): 1342 """Test for http://b/113070258 1343 1344 Make sure that we don't blow up when sending USB transfers that line up 1345 exactly with the USB packet size. 1346 """ 1347 1348 local_port = int(self.device.forward("tcp:0", "tcp:12345")) 1349 try: 1350 for size in [512, 1024]: 1351 def listener(): 1352 cmd = ["echo foo | nc -l -p 12345; echo done"] 1353 rc, stdout, stderr = self.device.shell_nocheck(cmd) 1354 1355 thread = threading.Thread(target=listener) 1356 thread.start() 1357 1358 # Wait a bit to let the shell command start. 1359 time.sleep(0.25) 1360 1361 sock = socket.create_connection(("localhost", local_port)) 1362 with contextlib.closing(sock): 1363 bytesWritten = sock.send("a" * size) 1364 self.assertEqual(size, bytesWritten) 1365 readBytes = sock.recv(4096) 1366 self.assertEqual("foo\n", readBytes) 1367 1368 thread.join() 1369 finally: 1370 self.device.forward_remove("tcp:{}".format(local_port)) 1371 1372 1373class SocketTest(DeviceTest): 1374 def test_socket_flush(self): 1375 """Test that we handle socket closure properly. 1376 1377 If we're done writing to a socket, closing before the other end has 1378 closed will send a TCP_RST if we have incoming data queued up, which 1379 may result in data that we've written being discarded. 1380 1381 Bug: http://b/74616284 1382 """ 1383 s = socket.create_connection(("localhost", 5037)) 1384 1385 def adb_length_prefixed(string): 1386 encoded = string.encode("utf8") 1387 result = b"%04x%s" % (len(encoded), encoded) 1388 return result 1389 1390 if "ANDROID_SERIAL" in os.environ: 1391 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"] 1392 else: 1393 transport_string = "host:transport-any" 1394 1395 s.sendall(adb_length_prefixed(transport_string)) 1396 response = s.recv(4) 1397 self.assertEquals(b"OKAY", response) 1398 1399 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo" 1400 s.sendall(adb_length_prefixed(shell_string)) 1401 1402 response = s.recv(4) 1403 self.assertEquals(b"OKAY", response) 1404 1405 # Spawn a thread that dumps garbage into the socket until failure. 1406 def spam(): 1407 buf = b"\0" * 16384 1408 try: 1409 while True: 1410 s.sendall(buf) 1411 except Exception as ex: 1412 print(ex) 1413 1414 thread = threading.Thread(target=spam) 1415 thread.start() 1416 1417 time.sleep(1) 1418 1419 received = b"" 1420 while True: 1421 read = s.recv(512) 1422 if len(read) == 0: 1423 break 1424 received += read 1425 1426 self.assertEquals(1024 * 1024 + len("foo\n"), len(received)) 1427 thread.join() 1428 1429 1430if sys.platform == "win32": 1431 # From https://stackoverflow.com/a/38749458 1432 import os 1433 import contextlib 1434 import msvcrt 1435 import ctypes 1436 from ctypes import wintypes 1437 1438 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) 1439 1440 GENERIC_READ = 0x80000000 1441 GENERIC_WRITE = 0x40000000 1442 FILE_SHARE_READ = 1 1443 FILE_SHARE_WRITE = 2 1444 CONSOLE_TEXTMODE_BUFFER = 1 1445 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value 1446 STD_OUTPUT_HANDLE = wintypes.DWORD(-11) 1447 STD_ERROR_HANDLE = wintypes.DWORD(-12) 1448 1449 def _check_zero(result, func, args): 1450 if not result: 1451 raise ctypes.WinError(ctypes.get_last_error()) 1452 return args 1453 1454 def _check_invalid(result, func, args): 1455 if result == INVALID_HANDLE_VALUE: 1456 raise ctypes.WinError(ctypes.get_last_error()) 1457 return args 1458 1459 if not hasattr(wintypes, 'LPDWORD'): # Python 2 1460 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) 1461 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT) 1462 1463 class COORD(ctypes.Structure): 1464 _fields_ = (('X', wintypes.SHORT), 1465 ('Y', wintypes.SHORT)) 1466 1467 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure): 1468 _fields_ = (('cbSize', wintypes.ULONG), 1469 ('dwSize', COORD), 1470 ('dwCursorPosition', COORD), 1471 ('wAttributes', wintypes.WORD), 1472 ('srWindow', wintypes.SMALL_RECT), 1473 ('dwMaximumWindowSize', COORD), 1474 ('wPopupAttributes', wintypes.WORD), 1475 ('bFullscreenSupported', wintypes.BOOL), 1476 ('ColorTable', wintypes.DWORD * 16)) 1477 def __init__(self, *args, **kwds): 1478 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__( 1479 *args, **kwds) 1480 self.cbSize = ctypes.sizeof(self) 1481 1482 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER( 1483 CONSOLE_SCREEN_BUFFER_INFOEX) 1484 LPSECURITY_ATTRIBUTES = wintypes.LPVOID 1485 1486 kernel32.GetStdHandle.errcheck = _check_invalid 1487 kernel32.GetStdHandle.restype = wintypes.HANDLE 1488 kernel32.GetStdHandle.argtypes = ( 1489 wintypes.DWORD,) # _In_ nStdHandle 1490 1491 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid 1492 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE 1493 kernel32.CreateConsoleScreenBuffer.argtypes = ( 1494 wintypes.DWORD, # _In_ dwDesiredAccess 1495 wintypes.DWORD, # _In_ dwShareMode 1496 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes 1497 wintypes.DWORD, # _In_ dwFlags 1498 wintypes.LPVOID) # _Reserved_ lpScreenBufferData 1499 1500 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero 1501 kernel32.GetConsoleScreenBufferInfoEx.argtypes = ( 1502 wintypes.HANDLE, # _In_ hConsoleOutput 1503 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo 1504 1505 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero 1506 kernel32.SetConsoleScreenBufferInfoEx.argtypes = ( 1507 wintypes.HANDLE, # _In_ hConsoleOutput 1508 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo 1509 1510 kernel32.SetConsoleWindowInfo.errcheck = _check_zero 1511 kernel32.SetConsoleWindowInfo.argtypes = ( 1512 wintypes.HANDLE, # _In_ hConsoleOutput 1513 wintypes.BOOL, # _In_ bAbsolute 1514 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow 1515 1516 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero 1517 kernel32.FillConsoleOutputCharacterW.argtypes = ( 1518 wintypes.HANDLE, # _In_ hConsoleOutput 1519 wintypes.WCHAR, # _In_ cCharacter 1520 wintypes.DWORD, # _In_ nLength 1521 COORD, # _In_ dwWriteCoord 1522 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten 1523 1524 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero 1525 kernel32.ReadConsoleOutputCharacterW.argtypes = ( 1526 wintypes.HANDLE, # _In_ hConsoleOutput 1527 wintypes.LPWSTR, # _Out_ lpCharacter 1528 wintypes.DWORD, # _In_ nLength 1529 COORD, # _In_ dwReadCoord 1530 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead 1531 1532 @contextlib.contextmanager 1533 def allocate_console(): 1534 allocated = kernel32.AllocConsole() 1535 try: 1536 yield allocated 1537 finally: 1538 if allocated: 1539 kernel32.FreeConsole() 1540 1541 @contextlib.contextmanager 1542 def console_screen(ncols=None, nrows=None): 1543 info = CONSOLE_SCREEN_BUFFER_INFOEX() 1544 new_info = CONSOLE_SCREEN_BUFFER_INFOEX() 1545 nwritten = (wintypes.DWORD * 1)() 1546 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE) 1547 kernel32.GetConsoleScreenBufferInfoEx( 1548 hStdOut, ctypes.byref(info)) 1549 if ncols is None: 1550 ncols = info.dwSize.X 1551 if nrows is None: 1552 nrows = info.dwSize.Y 1553 elif nrows > 9999: 1554 raise ValueError('nrows must be 9999 or less') 1555 fd_screen = None 1556 hScreen = kernel32.CreateConsoleScreenBuffer( 1557 GENERIC_READ | GENERIC_WRITE, 1558 FILE_SHARE_READ | FILE_SHARE_WRITE, 1559 None, CONSOLE_TEXTMODE_BUFFER, None) 1560 try: 1561 fd_screen = msvcrt.open_osfhandle( 1562 hScreen, os.O_RDWR | os.O_BINARY) 1563 kernel32.GetConsoleScreenBufferInfoEx( 1564 hScreen, ctypes.byref(new_info)) 1565 new_info.dwSize = COORD(ncols, nrows) 1566 new_info.srWindow = wintypes.SMALL_RECT( 1567 Left=0, Top=0, Right=(ncols - 1), 1568 Bottom=(info.srWindow.Bottom - info.srWindow.Top)) 1569 kernel32.SetConsoleScreenBufferInfoEx( 1570 hScreen, ctypes.byref(new_info)) 1571 kernel32.SetConsoleWindowInfo(hScreen, True, 1572 ctypes.byref(new_info.srWindow)) 1573 kernel32.FillConsoleOutputCharacterW( 1574 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten) 1575 kernel32.SetConsoleActiveScreenBuffer(hScreen) 1576 try: 1577 yield fd_screen 1578 finally: 1579 kernel32.SetConsoleScreenBufferInfoEx( 1580 hStdOut, ctypes.byref(info)) 1581 kernel32.SetConsoleWindowInfo(hStdOut, True, 1582 ctypes.byref(info.srWindow)) 1583 kernel32.SetConsoleActiveScreenBuffer(hStdOut) 1584 finally: 1585 if fd_screen is not None: 1586 os.close(fd_screen) 1587 else: 1588 kernel32.CloseHandle(hScreen) 1589 1590 def read_screen(fd): 1591 hScreen = msvcrt.get_osfhandle(fd) 1592 csbi = CONSOLE_SCREEN_BUFFER_INFOEX() 1593 kernel32.GetConsoleScreenBufferInfoEx( 1594 hScreen, ctypes.byref(csbi)) 1595 ncols = csbi.dwSize.X 1596 pos = csbi.dwCursorPosition 1597 length = ncols * pos.Y + pos.X + 1 1598 buf = (ctypes.c_wchar * length)() 1599 n = (wintypes.DWORD * 1)() 1600 kernel32.ReadConsoleOutputCharacterW( 1601 hScreen, buf, length, COORD(0,0), n) 1602 lines = [buf[i:i+ncols].rstrip(u'\0') 1603 for i in range(0, n[0], ncols)] 1604 return u'\n'.join(lines) 1605 1606@unittest.skipUnless(sys.platform == "win32", "requires Windows") 1607class WindowsConsoleTest(DeviceTest): 1608 def test_unicode_output(self): 1609 """Test Unicode command line parameters and Unicode console window output. 1610 1611 Bug: https://issuetracker.google.com/issues/111972753 1612 """ 1613 # If we don't have a console window, allocate one. This isn't necessary if we're already 1614 # being run from a console window, which is typical. 1615 with allocate_console() as allocated_console: 1616 # Create a temporary console buffer and switch to it. We could also pass a parameter of 1617 # ncols=len(unicode_string), but it causes the window to flash as it is resized and 1618 # likely unnecessary given the typical console window size. 1619 with console_screen(nrows=1000) as screen: 1620 unicode_string = u'로보카 폴리' 1621 # Run adb and allow it to detect that stdout is a console, not a pipe, by using 1622 # device.shell_popen() which does not use a pipe, unlike device.shell(). 1623 process = self.device.shell_popen(['echo', '"' + unicode_string + '"']) 1624 process.wait() 1625 # Read what was written by adb to the temporary console buffer. 1626 console_output = read_screen(screen) 1627 self.assertEqual(unicode_string, console_output) 1628 1629 1630def main(): 1631 random.seed(0) 1632 if len(adb.get_devices()) > 0: 1633 suite = unittest.TestLoader().loadTestsFromName(__name__) 1634 unittest.TextTestRunner(verbosity=3).run(suite) 1635 else: 1636 print('Test suite must be run with attached devices') 1637 1638 1639if __name__ == '__main__': 1640 main() 1641