1# Copyright (C) 2010-2021 Vincent Pelletier <plr.vincent@gmail.com> 2# 3# This library is free software; you can redistribute it and/or 4# modify it under the terms of the GNU Lesser General Public 5# License as published by the Free Software Foundation; either 6# version 2.1 of the License, or (at your option) any later version. 7# 8# This library is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 11# Lesser General Public License for more details. 12# 13# You should have received a copy of the GNU Lesser General Public 14# License along with this library; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17# pylint: disable=invalid-name, too-many-locals, too-many-arguments 18# pylint: disable=too-many-public-methods, too-many-instance-attributes 19# pylint: disable=missing-docstring 20""" 21Pythonic wrapper for libusb-1.0. 22 23The first thing you must do is to get an "USB context". To do so, create an 24USBContext instance. 25Then, you can use it to browse available USB devices and open the one you want 26to talk to. 27At this point, you should have a USBDeviceHandle instance (as returned by 28USBContext or USBDevice instances), and you can start exchanging with the 29device. 30 31Features: 32- Basic device settings (configuration & interface selection, ...) 33- String descriptor lookups (ASCII & unicode), and list supported language 34 codes 35- Synchronous I/O (control, bulk, interrupt) 36- Asynchronous I/O (control, bulk, interrupt, isochronous) 37 Note: Isochronous support is not well tested. 38 See USBPoller, USBTransfer and USBTransferHelper. 39 40All LIBUSB_* constants are available in this module, without the LIBUSB_ 41prefix - with one exception: LIBUSB_5GBPS_OPERATION is available as 42SUPER_SPEED_OPERATION, so it is a valid python identifier. 43 44All LIBUSB_ERROR_* constants are available in this module as exception classes, 45subclassing USBError. 46""" 47 48from __future__ import division, absolute_import 49from ctypes import byref, c_int, sizeof, POINTER, \ 50 cast, c_uint8, c_uint16, c_ubyte, c_void_p, cdll, addressof, \ 51 c_char 52from ctypes.util import find_library 53import sys 54import threading 55import warnings 56from weakref import WeakSet 57import collections 58import functools 59import contextlib 60import inspect 61from . import libusb1 62# pylint: disable=wrong-import-order,ungrouped-imports 63if sys.platform == 'win32': 64 from ctypes import get_last_error as get_errno 65else: 66 from ctypes import get_errno 67# pylint: enable=wrong-import-order,ungrouped-imports 68 69__all__ = [ 70 'USBContext', 'USBDeviceHandle', 'USBDevice', 'hasCapability', 71 'USBPoller', 'USBTransfer', 'USBTransferHelper', 'EVENT_CALLBACK_SET', 72 'USBEndpoint', 'USBInterfaceSetting', 'USBInterface', 73 'USBConfiguration', 'DoomedTransferError', 'getVersion', 'USBError', 74] 75# Bind libusb1 constants and libusb1.USBError to this module, so user does not 76# have to import two modules. 77USBError = libusb1.USBError 78STATUS_TO_EXCEPTION_DICT = {} 79def __bindConstants(): 80 global_dict = globals() 81 PREFIX = 'LIBUSB_' 82 for name, value in libusb1.__dict__.items(): 83 if name.startswith(PREFIX): 84 name = name[len(PREFIX):] 85 # Gah. 86 if name == '5GBPS_OPERATION': 87 name = 'SUPER_SPEED_OPERATION' 88 assert name not in global_dict 89 global_dict[name] = value 90 __all__.append(name) 91 # Finer-grained exceptions. 92 for name, value in libusb1.libusb_error.forward_dict.items(): 93 if value: 94 assert name.startswith(PREFIX + 'ERROR_'), name 95 if name == 'LIBUSB_ERROR_IO': 96 name = 'ErrorIO' 97 else: 98 name = ''.join(x.capitalize() for x in name.split('_')[1:]) 99 name = 'USB' + name 100 assert name not in global_dict, name 101 assert value not in STATUS_TO_EXCEPTION_DICT 102 STATUS_TO_EXCEPTION_DICT[value] = global_dict[name] = type( 103 name, 104 (USBError, ), 105 {'value': value}, 106 ) 107 __all__.append(name) 108__bindConstants() 109del __bindConstants 110 111def raiseUSBError( 112 value, 113 # Avoid globals lookup on call to work during interpreter shutdown. 114 # pylint: disable=dangerous-default-value 115 __STATUS_TO_EXCEPTION_DICT=STATUS_TO_EXCEPTION_DICT, 116 # pylint: enable=dangerous-default-value 117 __USBError=USBError, 118 ): 119 raise __STATUS_TO_EXCEPTION_DICT.get(value, __USBError)(value) 120 121def mayRaiseUSBError( 122 value, 123 # Avoid globals lookup on call to work during interpreter shutdown. 124 __raiseUSBError=raiseUSBError, 125 ): 126 if value < 0: 127 __raiseUSBError(value) 128 return value 129 130Version = collections.namedtuple( 131 'Version', 132 ['major', 'minor', 'micro', 'nano', 'rc', 'describe'], 133) 134 135if sys.version_info[0] == 3: 136 BYTE = bytes([0]) 137 # pylint: disable=redefined-builtin 138 xrange = range 139 long = int 140 # pylint: enable=redefined-builtin 141 integer_memoryview = memoryview 142else: 143 BYTE = '\x00' 144 # Work around python2's memoryview, which only accepts & generates strings. 145 # For consistency between async control and other async transfers. 146 # Python 2.7 will not be fixed, so wrap its memoryview. 147 # Breaks the no-copy promise, but control transfer performance should 148 # matter less than other types. 149 class integer_memoryview(object): 150 def __init__(self, view): 151 if not isinstance(view, memoryview): 152 view = memoryview(view) 153 self.__view = view 154 155 # Many boring magic methods, just to mimic memoryview 156 def __eq__(self, other): 157 return self.__view == other 158 159 def __ge__(self, other): 160 return self.__view >= other 161 162 def __gt__(self, other): 163 return self.__view > other 164 165 def __le__(self, other): 166 return self.__view <= other 167 168 def __lt__(self, other): 169 return self.__view < other 170 171 def __ne__(self, other): 172 return self.__view != other 173 174 def __hash__(self): 175 # raises 176 return hash(self.__view) 177 178 def __delitem__(self, key): 179 # raises 180 del self.__view[key] 181 182 def __len__(self): 183 return len(self.__view) 184 185 # To access format, itemsize, ndim, readonly, shape, strides, 186 # suboffsets, tobytes, tolist. 187 def __getattr__(self, name): 188 return getattr(self.__view, name) 189 190 # Actual payload of this class. 191 def __getitem__(self, key): 192 value = self.__view[key] 193 if isinstance(value, memoryview): 194 return self.__class__(value) 195 return ord(value) 196 197 def __setitem__(self, key, value): 198 if isinstance(value, (int, long)): 199 value = chr(value) 200 else: 201 value = ''.join(chr(x) for x in value) 202 self.__view[key] = value 203 204# pylint: disable=undefined-variable 205CONTROL_SETUP = BYTE * CONTROL_SETUP_SIZE 206# pylint: enable=undefined-variable 207 208# Default string length 209# From a comment in libusb-1.0: "Some devices choke on size > 255" 210STRING_LENGTH = 255 211 212# As of v3 of USB specs, there cannot be more than 7 hubs from controller to 213# device. 214PATH_MAX_DEPTH = 7 215 216EVENT_CALLBACK_SET = frozenset(( 217 # pylint: disable=undefined-variable 218 TRANSFER_COMPLETED, 219 TRANSFER_ERROR, 220 TRANSFER_TIMED_OUT, 221 TRANSFER_CANCELLED, 222 TRANSFER_STALL, 223 TRANSFER_NO_DEVICE, 224 TRANSFER_OVERFLOW, 225 # pylint: enable=undefined-variable 226)) 227 228DEFAULT_ASYNC_TRANSFER_ERROR_CALLBACK = lambda x: False 229 230def create_binary_buffer(init_or_size): 231 """ 232 ctypes.create_string_buffer variant which does not add a trailing null 233 when init_or_size is not a size. 234 """ 235 # As per ctypes.create_string_buffer, as of python 2.7.10 at least: 236 # - int or long is a length 237 # - str or unicode is an initialiser 238 # Testing the latter confuses 2to3, so test the former. 239 if isinstance(init_or_size, (int, long)): 240 init_or_size = bytearray(init_or_size) 241 return create_initialised_buffer(init_or_size) 242 243def create_initialised_buffer(init): 244 # raises if init is an integer - this is intentional 245 string_type = c_char * len(init) 246 try: 247 # zero-copy if init is a writable buffer 248 return string_type.from_buffer(init), init 249 # cpython (2.7 and 3.5) raises TypeError, pypy 5.4.1 raises ValueError 250 except (TypeError, ValueError): 251 # create our own writable buffer 252 init = bytearray(init) 253 return string_type.from_buffer(init), init 254 255class DoomedTransferError(Exception): 256 """Exception raised when altering/submitting a doomed transfer.""" 257 pass 258 259class USBTransfer(object): 260 """ 261 USB asynchronous transfer control & data. 262 263 All modification methods will raise if called on a submitted transfer. 264 Methods noted as "should not be called on a submitted transfer" will not 265 prevent you from reading, but returned value is unspecified. 266 267 Note on user_data: because of pypy's current ctype restrictions, user_data 268 is not provided to C level, but is managed purely in python. It should 269 change nothing for you, unless you are looking at underlying C transfer 270 structure - which you should never have to. 271 """ 272 # Prevent garbage collector from freeing the free function before our 273 # instances, as we need it to property destruct them. 274 __libusb_free_transfer = libusb1.libusb_free_transfer 275 __libusb_cancel_transfer = libusb1.libusb_cancel_transfer 276 __USBError = USBError 277 # pylint: disable=undefined-variable 278 __USBErrorNotFound = USBErrorNotFound 279 __mayRaiseUSBError = staticmethod(mayRaiseUSBError) 280 # pylint: enable=undefined-variable 281 __transfer = None 282 __initialized = False 283 __submitted_dict = {} 284 __callback = None 285 # Just to silence pylint watnings, this attribute gets overridden after 286 # class definition. 287 __ctypesCallbackWrapper = None 288 __doomed = False 289 __user_data = None 290 __transfer_buffer = None 291 __transfer_py_buffer = None 292 293 def __init__(self, handle, iso_packets, before_submit, after_completion): 294 """ 295 You should not instanciate this class directly. 296 Call "getTransfer" method on an USBDeviceHandle instance to get 297 instances of this class. 298 """ 299 if iso_packets < 0: 300 raise ValueError( 301 'Cannot request a negative number of iso packets.' 302 ) 303 self.__handle = handle 304 self.__before_submit = before_submit 305 self.__after_completion = after_completion 306 self.__num_iso_packets = iso_packets 307 result = libusb1.libusb_alloc_transfer(iso_packets) 308 if not result: 309 # pylint: disable=undefined-variable 310 raise USBErrorNoMem 311 # pylint: enable=undefined-variable 312 self.__transfer = result 313 314 def close(self): 315 """ 316 Break reference cycles to allow instance to be garbage-collected. 317 Raises if called on a submitted transfer. 318 """ 319 if self.isSubmitted(): 320 raise ValueError('Cannot close a submitted transfer') 321 self.doom() 322 self.__initialized = False 323 # Break possible external reference cycles 324 self.__callback = None 325 self.__user_data = None 326 # For some reason, overwriting callback is not enough to remove this 327 # reference cycle - though sometimes it works: 328 # self -> self.__dict__ -> libusb_transfer -> dict[x] -> dict[x] -> 329 # CThunkObject -> __callbackWrapper -> self 330 # So free transfer altogether. 331 if self.__transfer is not None: 332 self.__libusb_free_transfer(self.__transfer) 333 self.__transfer = None 334 self.__transfer_buffer = None 335 # Break USBDeviceHandle reference cycle 336 self.__before_submit = None 337 self.__after_completion = None 338 339 def doom(self): 340 """ 341 Prevent transfer from being submitted again. 342 """ 343 self.__doomed = True 344 345 def __del__(self): 346 if self.__transfer is not None: 347 try: 348 # If this doesn't raise, we're doomed; transfer was submitted, 349 # still python decided to garbage-collect this instance. 350 # Stick to libusb's documentation, and don't free the 351 # transfer. If interpreter is shutting down, kernel will 352 # reclaim memory anyway. 353 # Note: we can't prevent transfer's buffer from being 354 # garbage-collected as soon as there will be no remaining 355 # reference to transfer, so a segfault might happen anyway. 356 # Should we warn user ? How ? 357 self.cancel() 358 except self.__USBErrorNotFound: 359 # Transfer was not submitted, we can free it. 360 self.__libusb_free_transfer(self.__transfer) 361 362 @classmethod 363 def __callbackWrapper(cls, transfer_p): 364 """ 365 Makes it possible for user-provided callback to alter transfer when 366 fired (ie, mark transfer as not submitted upon call). 367 """ 368 self = cls.__submitted_dict.pop(addressof(transfer_p.contents)) 369 self.__after_completion(self) 370 callback = self.__callback 371 if callback is not None: 372 callback(self) 373 if self.__doomed: 374 self.close() 375 376 def setCallback(self, callback): 377 """ 378 Change transfer's callback. 379 """ 380 self.__callback = callback 381 382 def getCallback(self): 383 """ 384 Get currently set callback. 385 """ 386 return self.__callback 387 388 def setControl( 389 self, request_type, request, value, index, buffer_or_len, 390 callback=None, user_data=None, timeout=0): 391 """ 392 Setup transfer for control use. 393 394 request_type, request, value, index 395 See USBDeviceHandle.controlWrite. 396 request_type defines transfer direction (see 397 ENDPOINT_OUT and ENDPOINT_IN)). 398 buffer_or_len 399 Either a string (when sending data), or expected data length (when 400 receiving data). 401 callback 402 Callback function to be invoked on transfer completion. 403 Called with transfer as parameter, return value ignored. 404 user_data 405 User data to pass to callback function. 406 timeout 407 Transfer timeout in milliseconds. 0 to disable. 408 """ 409 if self.isSubmitted(): 410 raise ValueError('Cannot alter a submitted transfer') 411 if self.__doomed: 412 raise DoomedTransferError('Cannot reuse a doomed transfer') 413 if isinstance(buffer_or_len, (int, long)): 414 length = buffer_or_len 415 # pylint: disable=undefined-variable 416 string_buffer, transfer_py_buffer = create_binary_buffer( 417 length + CONTROL_SETUP_SIZE, 418 ) 419 # pylint: enable=undefined-variable 420 else: 421 length = len(buffer_or_len) 422 string_buffer, transfer_py_buffer = create_binary_buffer( 423 CONTROL_SETUP + buffer_or_len, 424 ) 425 self.__initialized = False 426 self.__transfer_buffer = string_buffer 427 # pylint: disable=undefined-variable 428 self.__transfer_py_buffer = integer_memoryview( 429 transfer_py_buffer, 430 )[CONTROL_SETUP_SIZE:] 431 # pylint: enable=undefined-variable 432 self.__user_data = user_data 433 libusb1.libusb_fill_control_setup( 434 string_buffer, request_type, request, value, index, length) 435 libusb1.libusb_fill_control_transfer( 436 self.__transfer, self.__handle, string_buffer, 437 self.__ctypesCallbackWrapper, None, timeout) 438 self.__callback = callback 439 self.__initialized = True 440 441 def setBulk( 442 self, endpoint, buffer_or_len, callback=None, user_data=None, 443 timeout=0): 444 """ 445 Setup transfer for bulk use. 446 447 endpoint 448 Endpoint to submit transfer to. Defines transfer direction (see 449 ENDPOINT_OUT and ENDPOINT_IN)). 450 buffer_or_len 451 Either a string (when sending data), or expected data length (when 452 receiving data) 453 To avoid memory copies, use an object implementing the writeable 454 buffer interface (ex: bytearray). 455 callback 456 Callback function to be invoked on transfer completion. 457 Called with transfer as parameter, return value ignored. 458 user_data 459 User data to pass to callback function. 460 timeout 461 Transfer timeout in milliseconds. 0 to disable. 462 """ 463 if self.isSubmitted(): 464 raise ValueError('Cannot alter a submitted transfer') 465 if self.__doomed: 466 raise DoomedTransferError('Cannot reuse a doomed transfer') 467 string_buffer, self.__transfer_py_buffer = create_binary_buffer( 468 buffer_or_len 469 ) 470 self.__initialized = False 471 self.__transfer_buffer = string_buffer 472 self.__user_data = user_data 473 libusb1.libusb_fill_bulk_transfer( 474 self.__transfer, self.__handle, endpoint, string_buffer, 475 sizeof(string_buffer), self.__ctypesCallbackWrapper, None, timeout) 476 self.__callback = callback 477 self.__initialized = True 478 479 def setInterrupt( 480 self, endpoint, buffer_or_len, callback=None, user_data=None, 481 timeout=0): 482 """ 483 Setup transfer for interrupt use. 484 485 endpoint 486 Endpoint to submit transfer to. Defines transfer direction (see 487 ENDPOINT_OUT and ENDPOINT_IN)). 488 buffer_or_len 489 Either a string (when sending data), or expected data length (when 490 receiving data) 491 To avoid memory copies, use an object implementing the writeable 492 buffer interface (ex: bytearray). 493 callback 494 Callback function to be invoked on transfer completion. 495 Called with transfer as parameter, return value ignored. 496 user_data 497 User data to pass to callback function. 498 timeout 499 Transfer timeout in milliseconds. 0 to disable. 500 """ 501 if self.isSubmitted(): 502 raise ValueError('Cannot alter a submitted transfer') 503 if self.__doomed: 504 raise DoomedTransferError('Cannot reuse a doomed transfer') 505 string_buffer, self.__transfer_py_buffer = create_binary_buffer( 506 buffer_or_len 507 ) 508 self.__initialized = False 509 self.__transfer_buffer = string_buffer 510 self.__user_data = user_data 511 libusb1.libusb_fill_interrupt_transfer( 512 self.__transfer, self.__handle, endpoint, string_buffer, 513 sizeof(string_buffer), self.__ctypesCallbackWrapper, None, timeout) 514 self.__callback = callback 515 self.__initialized = True 516 517 def setIsochronous( 518 self, endpoint, buffer_or_len, callback=None, 519 user_data=None, timeout=0, iso_transfer_length_list=None): 520 """ 521 Setup transfer for isochronous use. 522 523 endpoint 524 Endpoint to submit transfer to. Defines transfer direction (see 525 ENDPOINT_OUT and ENDPOINT_IN)). 526 buffer_or_len 527 Either a string (when sending data), or expected data length (when 528 receiving data) 529 To avoid memory copies, use an object implementing the writeable 530 buffer interface (ex: bytearray). 531 callback 532 Callback function to be invoked on transfer completion. 533 Called with transfer as parameter, return value ignored. 534 user_data 535 User data to pass to callback function. 536 timeout 537 Transfer timeout in milliseconds. 0 to disable. 538 iso_transfer_length_list 539 List of individual transfer sizes. If not provided, buffer_or_len 540 will be divided evenly among available transfers if possible, and 541 raise ValueError otherwise. 542 """ 543 if self.isSubmitted(): 544 raise ValueError('Cannot alter a submitted transfer') 545 num_iso_packets = self.__num_iso_packets 546 if num_iso_packets == 0: 547 raise TypeError( 548 'This transfer canot be used for isochronous I/O. ' 549 'You must get another one with a non-zero iso_packets ' 550 'parameter.' 551 ) 552 if self.__doomed: 553 raise DoomedTransferError('Cannot reuse a doomed transfer') 554 string_buffer, transfer_py_buffer = create_binary_buffer(buffer_or_len) 555 buffer_length = sizeof(string_buffer) 556 if iso_transfer_length_list is None: 557 iso_length, remainder = divmod(buffer_length, num_iso_packets) 558 if remainder: 559 raise ValueError( 560 'Buffer size %i cannot be evenly distributed among %i ' 561 'transfers' % ( 562 buffer_length, 563 num_iso_packets, 564 ) 565 ) 566 iso_transfer_length_list = [iso_length] * num_iso_packets 567 configured_iso_packets = len(iso_transfer_length_list) 568 if configured_iso_packets > num_iso_packets: 569 raise ValueError( 570 'Too many ISO transfer lengths (%i), there are ' 571 'only %i ISO transfers available' % ( 572 configured_iso_packets, 573 num_iso_packets, 574 ) 575 ) 576 if sum(iso_transfer_length_list) > buffer_length: 577 raise ValueError( 578 'ISO transfers too long (%i), there are only ' 579 '%i bytes available' % ( 580 sum(iso_transfer_length_list), 581 buffer_length, 582 ) 583 ) 584 transfer_p = self.__transfer 585 self.__initialized = False 586 self.__transfer_buffer = string_buffer 587 self.__transfer_py_buffer = transfer_py_buffer 588 self.__user_data = user_data 589 libusb1.libusb_fill_iso_transfer( 590 transfer_p, self.__handle, endpoint, string_buffer, buffer_length, 591 configured_iso_packets, self.__ctypesCallbackWrapper, None, 592 timeout) 593 for length, iso_packet_desc in zip( 594 iso_transfer_length_list, 595 libusb1.get_iso_packet_list(transfer_p)): 596 if length <= 0: 597 raise ValueError( 598 'Negative/null length transfers are not possible.' 599 ) 600 iso_packet_desc.length = length 601 self.__callback = callback 602 self.__initialized = True 603 604 def getType(self): 605 """ 606 Get transfer type. 607 608 Returns one of: 609 TRANSFER_TYPE_CONTROL 610 TRANSFER_TYPE_ISOCHRONOUS 611 TRANSFER_TYPE_BULK 612 TRANSFER_TYPE_INTERRUPT 613 """ 614 return self.__transfer.contents.type 615 616 def getEndpoint(self): 617 """ 618 Get endpoint. 619 """ 620 return self.__transfer.contents.endpoint 621 622 def getStatus(self): 623 """ 624 Get transfer status. 625 Should not be called on a submitted transfer. 626 """ 627 return self.__transfer.contents.status 628 629 def getActualLength(self): 630 """ 631 Get actually transfered data length. 632 Should not be called on a submitted transfer. 633 """ 634 return self.__transfer.contents.actual_length 635 636 def getBuffer(self): 637 """ 638 Get data buffer content. 639 Should not be called on a submitted transfer. 640 """ 641 return self.__transfer_py_buffer 642 643 def getUserData(self): 644 """ 645 Retrieve user data provided on setup. 646 """ 647 return self.__user_data 648 649 def setUserData(self, user_data): 650 """ 651 Change user data. 652 """ 653 self.__user_data = user_data 654 655 def getISOBufferList(self): 656 """ 657 Get individual ISO transfer's buffer. 658 Returns a list with one item per ISO transfer, with their 659 individually-configured sizes. 660 Returned list is consistent with getISOSetupList return value. 661 Should not be called on a submitted transfer. 662 663 See also iterISO. 664 """ 665 transfer_p = self.__transfer 666 transfer = transfer_p.contents 667 # pylint: disable=undefined-variable 668 if transfer.type != TRANSFER_TYPE_ISOCHRONOUS: 669 # pylint: enable=undefined-variable 670 raise TypeError( 671 'This method cannot be called on non-iso transfers.' 672 ) 673 return libusb1.get_iso_packet_buffer_list(transfer_p) 674 675 def getISOSetupList(self): 676 """ 677 Get individual ISO transfer's setup. 678 Returns a list of dicts, each containing an individual ISO transfer 679 parameters: 680 - length 681 - actual_length 682 - status 683 (see libusb1's API documentation for their signification) 684 Returned list is consistent with getISOBufferList return value. 685 Should not be called on a submitted transfer (except for 'length' 686 values). 687 """ 688 transfer_p = self.__transfer 689 transfer = transfer_p.contents 690 # pylint: disable=undefined-variable 691 if transfer.type != TRANSFER_TYPE_ISOCHRONOUS: 692 # pylint: enable=undefined-variable 693 raise TypeError( 694 'This method cannot be called on non-iso transfers.' 695 ) 696 return [ 697 { 698 'length': x.length, 699 'actual_length': x.actual_length, 700 'status': x.status, 701 } 702 for x in libusb1.get_iso_packet_list(transfer_p) 703 ] 704 705 def iterISO(self): 706 """ 707 Generator yielding (status, buffer) for each isochornous transfer. 708 buffer is truncated to actual_length. 709 This is more efficient than calling both getISOBufferList and 710 getISOSetupList when receiving data. 711 Should not be called on a submitted transfer. 712 """ 713 transfer_p = self.__transfer 714 transfer = transfer_p.contents 715 # pylint: disable=undefined-variable 716 if transfer.type != TRANSFER_TYPE_ISOCHRONOUS: 717 # pylint: enable=undefined-variable 718 raise TypeError( 719 'This method cannot be called on non-iso transfers.' 720 ) 721 buffer_position = transfer.buffer 722 for iso_transfer in libusb1.get_iso_packet_list(transfer_p): 723 yield ( 724 iso_transfer.status, 725 libusb1.buffer_at(buffer_position, iso_transfer.actual_length), 726 ) 727 buffer_position += iso_transfer.length 728 729 def setBuffer(self, buffer_or_len): 730 """ 731 Replace buffer with a new one. 732 Allows resizing read buffer and replacing data sent. 733 Note: resizing is not allowed for isochronous buffer (use 734 setIsochronous). 735 Note: disallowed on control transfers (use setControl). 736 """ 737 if self.isSubmitted(): 738 raise ValueError('Cannot alter a submitted transfer') 739 transfer = self.__transfer.contents 740 # pylint: disable=undefined-variable 741 if transfer.type == TRANSFER_TYPE_CONTROL: 742 # pylint: enable=undefined-variable 743 raise ValueError( 744 'To alter control transfer buffer, use setControl' 745 ) 746 buff, transfer_py_buffer = create_binary_buffer(buffer_or_len) 747 # pylint: disable=undefined-variable 748 if transfer.type == TRANSFER_TYPE_ISOCHRONOUS and \ 749 sizeof(buff) != transfer.length: 750 # pylint: enable=undefined-variable 751 raise ValueError( 752 'To alter isochronous transfer buffer length, use ' 753 'setIsochronous' 754 ) 755 self.__transfer_buffer = buff 756 self.__transfer_py_buffer = transfer_py_buffer 757 transfer.buffer = cast(buff, c_void_p) 758 transfer.length = sizeof(buff) 759 760 def isSubmitted(self): 761 """ 762 Tells if this transfer is submitted and still pending. 763 """ 764 return self.__transfer is not None and addressof( 765 self.__transfer.contents, 766 ) in self.__submitted_dict 767 768 def submit(self): 769 """ 770 Submit transfer for asynchronous handling. 771 """ 772 if self.isSubmitted(): 773 raise ValueError('Cannot submit a submitted transfer') 774 if not self.__initialized: 775 raise ValueError( 776 'Cannot submit a transfer until it has been initialized' 777 ) 778 if self.__doomed: 779 raise DoomedTransferError('Cannot submit doomed transfer') 780 self.__before_submit(self) 781 transfer = self.__transfer 782 assert transfer is not None 783 self.__submitted_dict[addressof(transfer.contents)] = self 784 result = libusb1.libusb_submit_transfer(transfer) 785 if result: 786 self.__after_completion(self) 787 self.__submitted_dict.pop(addressof(transfer.contents)) 788 raiseUSBError(result) 789 790 def cancel(self): 791 """ 792 Cancel transfer. 793 Note: cancellation happens asynchronously, so you must wait for 794 TRANSFER_CANCELLED. 795 """ 796 if not self.isSubmitted(): 797 # XXX: Workaround for a bug reported on libusb 1.0.8: calling 798 # libusb_cancel_transfer on a non-submitted transfer might 799 # trigger a segfault. 800 raise self.__USBErrorNotFound 801 self.__mayRaiseUSBError(self.__libusb_cancel_transfer(self.__transfer)) 802 803# XXX: This is very unsightly, but I do not see another way of declaring within 804# class body both the class method and its ctypes function pointer. 805# pylint: disable=protected-access,no-member 806USBTransfer._USBTransfer__ctypesCallbackWrapper = libusb1.libusb_transfer_cb_fn_p( 807 USBTransfer._USBTransfer__callbackWrapper, 808) 809# pylint: enable=protected-access,no-member 810 811class USBTransferHelper(object): 812 """ 813 Simplifies subscribing to the same transfer over and over, and callback 814 handling: 815 - no need to read event status to execute apropriate code, just setup 816 different functions for each status code 817 - just return True instead of calling submit 818 - no need to check if transfer is doomed before submitting it again, 819 DoomedTransferError is caught. 820 821 Callbacks used in this class must follow the callback API described in 822 USBTransfer, and are expected to return a boolean: 823 - True if transfer is to be submitted again (to receive/send more data) 824 - False otherwise 825 826 Note: as per libusb1 specifications, isochronous transfer global state 827 might be TRANSFER_COMPLETED although some individual packets might 828 have an error status. You can check individual packet status by calling 829 getISOSetupList on transfer object in your callback. 830 """ 831 def __init__(self, transfer=None): 832 """ 833 Create a transfer callback dispatcher. 834 835 transfer parameter is deprecated. If provided, it will be equivalent 836 to: 837 helper = USBTransferHelper() 838 transfer.setCallback(helper) 839 and also allows using deprecated methods on this class (otherwise, 840 they raise AttributeError). 841 """ 842 if transfer is not None: 843 # Deprecated: to drop 844 self.__transfer = transfer 845 transfer.setCallback(self) 846 self.__event_callback_dict = {} 847 self.__errorCallback = DEFAULT_ASYNC_TRANSFER_ERROR_CALLBACK 848 849 def submit(self): 850 """ 851 Submit the asynchronous read request. 852 Deprecated. Use submit on transfer. 853 """ 854 # Deprecated: to drop 855 self.__transfer.submit() 856 857 def cancel(self): 858 """ 859 Cancel a pending read request. 860 Deprecated. Use cancel on transfer. 861 """ 862 # Deprecated: to drop 863 self.__transfer.cancel() 864 865 def setEventCallback(self, event, callback): 866 """ 867 Set a function to call for a given event. 868 event must be one of: 869 TRANSFER_COMPLETED 870 TRANSFER_ERROR 871 TRANSFER_TIMED_OUT 872 TRANSFER_CANCELLED 873 TRANSFER_STALL 874 TRANSFER_NO_DEVICE 875 TRANSFER_OVERFLOW 876 """ 877 if event not in EVENT_CALLBACK_SET: 878 raise ValueError('Unknown event %r.' % (event, )) 879 self.__event_callback_dict[event] = callback 880 881 def setDefaultCallback(self, callback): 882 """ 883 Set the function to call for event which don't have a specific callback 884 registered. 885 The initial default callback does nothing and returns False. 886 """ 887 self.__errorCallback = callback 888 889 def getEventCallback(self, event, default=None): 890 """ 891 Return the function registered to be called for given event identifier. 892 """ 893 return self.__event_callback_dict.get(event, default) 894 895 def __call__(self, transfer): 896 """ 897 Callback to set on transfers. 898 """ 899 if self.getEventCallback(transfer.getStatus(), self.__errorCallback)( 900 transfer): 901 try: 902 transfer.submit() 903 except DoomedTransferError: 904 pass 905 906 def isSubmited(self): 907 """ 908 Returns whether this reader is currently waiting for an event. 909 Deprecatd. Use isSubmitted on transfer. 910 """ 911 # Deprecated: to drop 912 return self.__transfer.isSubmitted() 913 914class USBPoller(object): 915 """ 916 Class allowing integration of USB event polling in a file-descriptor 917 monitoring event loop. 918 919 WARNING: Do not call "poll" from several threads concurently. Do not use 920 synchronous USB transfers in a thread while "poll" is running. Doing so 921 will result in unnecessarily long pauses in some threads. Opening and/or 922 closing devices while polling can cause race conditions to occur. 923 """ 924 def __init__(self, context, poller): 925 """ 926 Create a poller for given context. 927 Warning: it will not check if another poller instance was already 928 present for that context, and will replace it. 929 930 poller is a polling instance implementing the following methods: 931 - register(fd, event_flags) 932 event_flags have the same meaning as in poll API (POLLIN & POLLOUT) 933 - unregister(fd) 934 - poll(timeout) 935 timeout being a float in seconds, or negative/None if there is no 936 timeout. 937 It must return a list of (descriptor, event) pairs. 938 Note: USBPoller is itself a valid poller. 939 Note2: select.poll uses a timeout in milliseconds, for some reason 940 (all other select.* classes use seconds for timeout), so you should 941 wrap it to convert & round/truncate timeout. 942 """ 943 self.__context = context 944 self.__poller = poller 945 self.__fd_set = set() 946 context.setPollFDNotifiers(self._registerFD, self._unregisterFD) 947 for fd, events in context.getPollFDList(): 948 self._registerFD(fd, events) 949 950 def __del__(self): 951 self.__context.setPollFDNotifiers(None, None) 952 953 def poll(self, timeout=None): 954 """ 955 Poll for events. 956 timeout can be a float in seconds, or None for no timeout. 957 Returns a list of (descriptor, event) pairs. 958 """ 959 next_usb_timeout = self.__context.getNextTimeout() 960 if timeout is None or timeout < 0: 961 usb_timeout = next_usb_timeout 962 elif next_usb_timeout: 963 usb_timeout = min(next_usb_timeout, timeout) 964 else: 965 usb_timeout = timeout 966 event_list = self.__poller.poll(usb_timeout) 967 if event_list: 968 fd_set = self.__fd_set 969 result = [(x, y) for x, y in event_list if x not in fd_set] 970 if len(result) != len(event_list): 971 self.__context.handleEventsTimeout() 972 else: 973 result = event_list 974 self.__context.handleEventsTimeout() 975 return result 976 977 def register(self, fd, events): 978 """ 979 Register an USB-unrelated fd to poller. 980 Convenience method. 981 """ 982 if fd in self.__fd_set: 983 raise ValueError( 984 'This fd is a special USB event fd, it cannot be polled.' 985 ) 986 self.__poller.register(fd, events) 987 988 def unregister(self, fd): 989 """ 990 Unregister an USB-unrelated fd from poller. 991 Convenience method. 992 """ 993 if fd in self.__fd_set: 994 raise ValueError( 995 'This fd is a special USB event fd, it must stay registered.' 996 ) 997 self.__poller.unregister(fd) 998 999 # pylint: disable=unused-argument 1000 def _registerFD(self, fd, events, user_data=None): 1001 self.register(fd, events) 1002 self.__fd_set.add(fd) 1003 # pylint: enable=unused-argument 1004 1005 # pylint: disable=unused-argument 1006 def _unregisterFD(self, fd, user_data=None): 1007 self.__fd_set.discard(fd) 1008 self.unregister(fd) 1009 # pylint: enable=unused-argument 1010 1011class _ReleaseInterface(object): 1012 def __init__(self, handle, interface): 1013 self._handle = handle 1014 self._interface = interface 1015 1016 def __enter__(self): 1017 # USBDeviceHandle.claimInterface already claimed the interface. 1018 pass 1019 1020 def __exit__(self, exc_type, exc_val, exc_tb): 1021 self._handle.releaseInterface(self._interface) 1022 1023class USBDeviceHandle(object): 1024 """ 1025 Represents an opened USB device. 1026 """ 1027 __handle = None 1028 __libusb_close = libusb1.libusb_close 1029 # pylint: disable=undefined-variable 1030 __USBErrorNoDevice = USBErrorNoDevice 1031 __USBErrorNotFound = USBErrorNotFound 1032 __USBErrorInterrupted = USBErrorInterrupted 1033 # pylint: enable=undefined-variable 1034 __set = set 1035 __KeyError = KeyError 1036 1037 def __init__(self, context, handle, device): 1038 """ 1039 You should not instanciate this class directly. 1040 Call "open" method on an USBDevice instance to get an USBDeviceHandle 1041 instance. 1042 """ 1043 self.__context = context 1044 # Weak reference to transfers about this device so we can clean up 1045 # before closing device. 1046 self.__transfer_set = WeakSet() 1047 # Strong references to inflight transfers so they do not get freed 1048 # even if user drops all strong references to them. If this instance 1049 # is garbage-collected, we close all transfers, so it's fine. 1050 self.__inflight = inflight = set() 1051 # XXX: For some reason, doing self.__inflight.{add|remove} inside 1052 # getTransfer causes extra intermediate python objects for each 1053 # allocated transfer. Storing them as properties solves this. Found 1054 # with objgraph. 1055 self.__inflight_add = inflight.add 1056 self.__inflight_remove = inflight.remove 1057 self.__handle = handle 1058 self.__device = device 1059 1060 def __del__(self): 1061 self.close() 1062 1063 def close(self): 1064 """ 1065 Close this handle. If not called explicitely, will be called by 1066 destructor. 1067 1068 This method cancels any in-flight transfer when it is called. As 1069 cancellation is not immediate, this method needs to let libusb handle 1070 events until transfers are actually cancelled. 1071 In multi-threaded programs, this can lead to stalls. To avoid this, 1072 do not close nor let GC collect a USBDeviceHandle which has in-flight 1073 transfers. 1074 """ 1075 handle = self.__handle 1076 if handle is None: 1077 return 1078 # Build a strong set from weak self.__transfer_set so we can doom 1079 # and close all contained transfers. 1080 # Because of backward compatibility, self.__transfer_set might be a 1081 # wrapper around WeakKeyDictionary. As it might be modified by gc, 1082 # we must pop until there is not key left instead of iterating over 1083 # it. 1084 weak_transfer_set = self.__transfer_set 1085 transfer_set = self.__set() 1086 while True: 1087 try: 1088 transfer = weak_transfer_set.pop() 1089 except self.__KeyError: 1090 break 1091 transfer_set.add(transfer) 1092 transfer.doom() 1093 inflight = self.__inflight 1094 for transfer in inflight: 1095 try: 1096 transfer.cancel() 1097 except (self.__USBErrorNotFound, self.__USBErrorNoDevice): 1098 pass 1099 while inflight: 1100 try: 1101 self.__context.handleEvents() 1102 except self.__USBErrorInterrupted: 1103 pass 1104 for transfer in transfer_set: 1105 transfer.close() 1106 self.__libusb_close(handle) 1107 self.__handle = None 1108 1109 def getDevice(self): 1110 """ 1111 Get an USBDevice instance for the device accessed through this handle, 1112 to access to the descriptors available in OS cache. 1113 """ 1114 return self.__device 1115 1116 def getConfiguration(self): 1117 """ 1118 Get the current configuration number for this device. 1119 """ 1120 configuration = c_int() 1121 mayRaiseUSBError(libusb1.libusb_get_configuration( 1122 self.__handle, byref(configuration), 1123 )) 1124 return configuration.value 1125 1126 def setConfiguration(self, configuration): 1127 """ 1128 Set the configuration number for this device. 1129 """ 1130 mayRaiseUSBError( 1131 libusb1.libusb_set_configuration(self.__handle, configuration), 1132 ) 1133 1134 def getManufacturer(self): 1135 """ 1136 Get device's manufaturer name. 1137 """ 1138 return self.getASCIIStringDescriptor( 1139 self.__device.device_descriptor.iManufacturer, 1140 ) 1141 1142 def getProduct(self): 1143 """ 1144 Get device's product name. 1145 """ 1146 return self.getASCIIStringDescriptor( 1147 self.__device.device_descriptor.iProduct, 1148 ) 1149 1150 def getSerialNumber(self): 1151 """ 1152 Get device's serial number. 1153 """ 1154 return self.getASCIIStringDescriptor( 1155 self.__device.device_descriptor.iSerialNumber, 1156 ) 1157 1158 def claimInterface(self, interface): 1159 """ 1160 Claim (= get exclusive access to) given interface number. Required to 1161 receive/send data. 1162 1163 Can be used as a context manager: 1164 with handle.claimInterface(0): 1165 # do stuff 1166 # handle.releaseInterface(0) gets automatically called 1167 """ 1168 mayRaiseUSBError( 1169 libusb1.libusb_claim_interface(self.__handle, interface), 1170 ) 1171 return _ReleaseInterface(self, interface) 1172 1173 def releaseInterface(self, interface): 1174 """ 1175 Release interface, allowing another process to use it. 1176 """ 1177 mayRaiseUSBError( 1178 libusb1.libusb_release_interface(self.__handle, interface), 1179 ) 1180 1181 def setInterfaceAltSetting(self, interface, alt_setting): 1182 """ 1183 Set interface's alternative setting (both parameters are integers). 1184 """ 1185 mayRaiseUSBError(libusb1.libusb_set_interface_alt_setting( 1186 self.__handle, interface, alt_setting, 1187 )) 1188 1189 def clearHalt(self, endpoint): 1190 """ 1191 Clear a halt state on given endpoint number. 1192 """ 1193 mayRaiseUSBError(libusb1.libusb_clear_halt(self.__handle, endpoint)) 1194 1195 def resetDevice(self): 1196 """ 1197 Reinitialise current device. 1198 Attempts to restore current configuration & alt settings. 1199 If this fails, will result in a device disconnect & reconnect, so you 1200 have to close current device and rediscover it (notified by an 1201 USBErrorNotFound exception). 1202 """ 1203 mayRaiseUSBError(libusb1.libusb_reset_device(self.__handle)) 1204 1205 def kernelDriverActive(self, interface): 1206 """ 1207 Tell whether a kernel driver is active on given interface number. 1208 """ 1209 result = libusb1.libusb_kernel_driver_active(self.__handle, interface) 1210 if result == 0: 1211 return False 1212 elif result == 1: 1213 return True 1214 raiseUSBError(result) 1215 1216 def detachKernelDriver(self, interface): 1217 """ 1218 Ask kernel driver to detach from given interface number. 1219 """ 1220 mayRaiseUSBError( 1221 libusb1.libusb_detach_kernel_driver(self.__handle, interface), 1222 ) 1223 1224 def attachKernelDriver(self, interface): 1225 """ 1226 Ask kernel driver to re-attach to given interface number. 1227 """ 1228 mayRaiseUSBError( 1229 libusb1.libusb_attach_kernel_driver(self.__handle, interface), 1230 ) 1231 1232 def setAutoDetachKernelDriver(self, enable): 1233 """ 1234 Control automatic kernel driver detach. 1235 enable (bool) 1236 True to enable auto-detach, False to disable it. 1237 """ 1238 mayRaiseUSBError(libusb1.libusb_set_auto_detach_kernel_driver( 1239 self.__handle, bool(enable), 1240 )) 1241 1242 def getSupportedLanguageList(self): 1243 """ 1244 Return a list of USB language identifiers (as integers) supported by 1245 current device for its string descriptors. 1246 1247 Note: language identifiers seem (I didn't check them all...) very 1248 similar to windows language identifiers, so you may want to use 1249 locales.windows_locale to get an rfc3066 representation. The 5 standard 1250 HID language codes are missing though. 1251 """ 1252 descriptor_string, _ = create_binary_buffer(STRING_LENGTH) 1253 result = libusb1.libusb_get_string_descriptor( 1254 self.__handle, 0, 0, descriptor_string, sizeof(descriptor_string), 1255 ) 1256 # pylint: disable=undefined-variable 1257 if result == ERROR_PIPE: 1258 # pylint: enable=undefined-variable 1259 # From libusb_control_transfer doc: 1260 # control request not supported by the device 1261 return [] 1262 mayRaiseUSBError(result) 1263 langid_list = cast(descriptor_string, POINTER(c_uint16)) 1264 return [ 1265 libusb1.libusb_le16_to_cpu(langid_list[offset]) 1266 for offset in xrange(1, cast(descriptor_string, POINTER(c_ubyte))[0] // 2) 1267 ] 1268 1269 def getStringDescriptor(self, descriptor, lang_id, errors='strict'): 1270 """ 1271 Fetch description string for given descriptor and in given language. 1272 Use getSupportedLanguageList to know which languages are available. 1273 Return value is a unicode string. 1274 Return None if there is no such descriptor on device. 1275 """ 1276 if descriptor == 0: 1277 return None 1278 descriptor_string = bytearray(STRING_LENGTH) 1279 try: 1280 received = mayRaiseUSBError(libusb1.libusb_get_string_descriptor( 1281 self.__handle, descriptor, lang_id, 1282 create_binary_buffer(descriptor_string)[0], 1283 STRING_LENGTH, 1284 )) 1285 # pylint: disable=undefined-variable 1286 except USBErrorNotFound: 1287 # pylint: enable=undefined-variable 1288 return None 1289 if received < 2 or descriptor_string[1] != DT_STRING: 1290 raise ValueError('Invalid string descriptor') 1291 return descriptor_string[2:min( 1292 received, 1293 descriptor_string[0], 1294 )].decode('UTF-16-LE', errors=errors) 1295 1296 def getASCIIStringDescriptor(self, descriptor, errors='strict'): 1297 """ 1298 Fetch description string for given descriptor in first available 1299 language. 1300 Return value is a unicode string. 1301 Return None if there is no such descriptor on device. 1302 """ 1303 if descriptor == 0: 1304 return None 1305 descriptor_string = bytearray(STRING_LENGTH) 1306 try: 1307 received = mayRaiseUSBError(libusb1.libusb_get_string_descriptor_ascii( 1308 self.__handle, descriptor, 1309 create_binary_buffer(descriptor_string)[0], 1310 STRING_LENGTH, 1311 )) 1312 # pylint: disable=undefined-variable 1313 except USBErrorNotFound: 1314 # pylint: enable=undefined-variable 1315 return None 1316 return descriptor_string[:received].decode('ASCII', errors=errors) 1317 1318 # Sync I/O 1319 1320 def _controlTransfer( 1321 self, request_type, request, value, index, data, length, timeout): 1322 result = libusb1.libusb_control_transfer( 1323 self.__handle, request_type, request, value, index, data, length, 1324 timeout, 1325 ) 1326 mayRaiseUSBError(result) 1327 return result 1328 1329 def controlWrite( 1330 self, request_type, request, value, index, data, timeout=0): 1331 """ 1332 Synchronous control write. 1333 request_type: request type bitmask (bmRequestType), see 1334 constants TYPE_* and RECIPIENT_*. 1335 request: request id (some values are standard). 1336 value, index, data: meaning is request-dependent. 1337 timeout: in milliseconds, how long to wait for device acknowledgement. 1338 Set to 0 to disable. 1339 1340 To avoid memory copies, use an object implementing the writeable buffer 1341 interface (ex: bytearray) for the "data" parameter. 1342 1343 Returns the number of bytes actually sent. 1344 """ 1345 # pylint: disable=undefined-variable 1346 request_type = (request_type & ~ENDPOINT_DIR_MASK) | ENDPOINT_OUT 1347 # pylint: enable=undefined-variable 1348 data, _ = create_initialised_buffer(data) 1349 return self._controlTransfer(request_type, request, value, index, data, 1350 sizeof(data), timeout) 1351 1352 def controlRead( 1353 self, request_type, request, value, index, length, timeout=0): 1354 """ 1355 Synchronous control read. 1356 timeout: in milliseconds, how long to wait for data. Set to 0 to 1357 disable. 1358 See controlWrite for other parameters description. 1359 1360 To avoid memory copies, use an object implementing the writeable buffer 1361 interface (ex: bytearray) for the "data" parameter. 1362 1363 Returns received data. 1364 """ 1365 # pylint: disable=undefined-variable 1366 request_type = (request_type & ~ENDPOINT_DIR_MASK) | ENDPOINT_IN 1367 # pylint: enable=undefined-variable 1368 data, data_buffer = create_binary_buffer(length) 1369 transferred = self._controlTransfer( 1370 request_type, request, value, index, data, length, timeout, 1371 ) 1372 return data_buffer[:transferred] 1373 1374 def _bulkTransfer(self, endpoint, data, length, timeout): 1375 transferred = c_int() 1376 try: 1377 mayRaiseUSBError(libusb1.libusb_bulk_transfer( 1378 self.__handle, endpoint, data, length, byref(transferred), timeout, 1379 )) 1380 except USBErrorTimeout as exception: 1381 exception.transferred = transferred.value 1382 raise 1383 return transferred.value 1384 1385 def bulkWrite(self, endpoint, data, timeout=0): 1386 """ 1387 Synchronous bulk write. 1388 endpoint: endpoint to send data to. 1389 data: data to send. 1390 timeout: in milliseconds, how long to wait for device acknowledgement. 1391 Set to 0 to disable. 1392 1393 To avoid memory copies, use an object implementing the writeable buffer 1394 interface (ex: bytearray) for the "data" parameter. 1395 1396 Returns the number of bytes actually sent. 1397 1398 May raise an exception from the USBError family. USBErrorTimeout 1399 exception has a "transferred" property giving the number of bytes sent 1400 up to the timeout. 1401 """ 1402 # pylint: disable=undefined-variable 1403 endpoint = (endpoint & ~ENDPOINT_DIR_MASK) | ENDPOINT_OUT 1404 # pylint: enable=undefined-variable 1405 data, _ = create_initialised_buffer(data) 1406 return self._bulkTransfer(endpoint, data, sizeof(data), timeout) 1407 1408 def bulkRead(self, endpoint, length, timeout=0): 1409 """ 1410 Synchronous bulk read. 1411 timeout: in milliseconds, how long to wait for data. Set to 0 to 1412 disable. 1413 See bulkWrite for other parameters description. 1414 1415 To avoid memory copies, use an object implementing the writeable buffer 1416 interface (ex: bytearray) for the "data" parameter. 1417 1418 Returns received data. 1419 1420 May raise an exception from the USBError family. USBErrorTimeout 1421 exception has a "received" property giving the bytes received up to the 1422 timeout. 1423 """ 1424 # pylint: disable=undefined-variable 1425 endpoint = (endpoint & ~ENDPOINT_DIR_MASK) | ENDPOINT_IN 1426 # pylint: enable=undefined-variable 1427 data, data_buffer = create_binary_buffer(length) 1428 try: 1429 transferred = self._bulkTransfer(endpoint, data, length, timeout) 1430 except USBErrorTimeout as exception: 1431 exception.received = data_buffer[:exception.transferred] 1432 raise 1433 return data_buffer[:transferred] 1434 1435 def _interruptTransfer(self, endpoint, data, length, timeout): 1436 transferred = c_int() 1437 try: 1438 mayRaiseUSBError(libusb1.libusb_interrupt_transfer( 1439 self.__handle, 1440 endpoint, 1441 data, 1442 length, 1443 byref(transferred), 1444 timeout, 1445 )) 1446 except USBErrorTimeout as exception: 1447 exception.transferred = transferred.value 1448 raise 1449 return transferred.value 1450 1451 def interruptWrite(self, endpoint, data, timeout=0): 1452 """ 1453 Synchronous interrupt write. 1454 endpoint: endpoint to send data to. 1455 data: data to send. 1456 timeout: in milliseconds, how long to wait for device acknowledgement. 1457 Set to 0 to disable. 1458 1459 To avoid memory copies, use an object implementing the writeable buffer 1460 interface (ex: bytearray) for the "data" parameter. 1461 1462 Returns the number of bytes actually sent. 1463 1464 May raise an exception from the USBError family. USBErrorTimeout 1465 exception has a "transferred" property giving the number of bytes sent 1466 up to the timeout. 1467 """ 1468 # pylint: disable=undefined-variable 1469 endpoint = (endpoint & ~ENDPOINT_DIR_MASK) | ENDPOINT_OUT 1470 # pylint: enable=undefined-variable 1471 data, _ = create_initialised_buffer(data) 1472 return self._interruptTransfer(endpoint, data, sizeof(data), timeout) 1473 1474 def interruptRead(self, endpoint, length, timeout=0): 1475 """ 1476 Synchronous interrupt write. 1477 timeout: in milliseconds, how long to wait for data. Set to 0 to 1478 disable. 1479 See interruptWrite for other parameters description. 1480 1481 To avoid memory copies, use an object implementing the writeable buffer 1482 interface (ex: bytearray) for the "data" parameter. 1483 1484 Returns received data. 1485 1486 May raise an exception from the USBError family. USBErrorTimeout 1487 exception has a "received" property giving the bytes received up to the 1488 timeout. 1489 """ 1490 # pylint: disable=undefined-variable 1491 endpoint = (endpoint & ~ENDPOINT_DIR_MASK) | ENDPOINT_IN 1492 # pylint: enable=undefined-variable 1493 data, data_buffer = create_binary_buffer(length) 1494 try: 1495 transferred = self._interruptTransfer( 1496 endpoint, 1497 data, 1498 length, 1499 timeout, 1500 ) 1501 except USBErrorTimeout as exception: 1502 exception.received = data_buffer[:exception.transferred] 1503 raise 1504 return data_buffer[:transferred] 1505 1506 def getTransfer(self, iso_packets=0): 1507 """ 1508 Get an USBTransfer instance for asynchronous use. 1509 iso_packets: the number of isochronous transfer descriptors to 1510 allocate. 1511 """ 1512 result = USBTransfer( 1513 self.__handle, iso_packets, 1514 self.__inflight_add, self.__inflight_remove, 1515 ) 1516 self.__transfer_set.add(result) 1517 return result 1518 1519class USBConfiguration(object): 1520 def __init__(self, context, config, device_speed): 1521 """ 1522 You should not instanciate this class directly. 1523 Call USBDevice methods to get instances of this class. 1524 """ 1525 if not isinstance(config, libusb1.libusb_config_descriptor): 1526 raise TypeError('Unexpected descriptor type.') 1527 self.__config = config 1528 self.__context = context 1529 self.__device_speed = device_speed 1530 1531 def getNumInterfaces(self): 1532 return self.__config.bNumInterfaces 1533 1534 __len__ = getNumInterfaces 1535 1536 def getConfigurationValue(self): 1537 return self.__config.bConfigurationValue 1538 1539 def getDescriptor(self): 1540 return self.__config.iConfiguration 1541 1542 def getAttributes(self): 1543 return self.__config.bmAttributes 1544 1545 def getMaxPower(self): 1546 """ 1547 Returns device's power consumption in mA. 1548 1549 USB descriptor is expressed in units of 2 mA when the device is operating in high-speed mode 1550 and in units of 8 mA when the device is operating in super-speed mode. This function scales 1551 the descriptor value appropriately. 1552 """ 1553 return self.__config.MaxPower * (8 if self.__device_speed == SPEED_SUPER else 2) 1554 1555 def getExtra(self): 1556 """ 1557 Returns a list of extra (non-basic) descriptors (DFU, HID, ...). 1558 """ 1559 return libusb1.get_extra(self.__config) 1560 1561 def __iter__(self): 1562 """ 1563 Iterates over interfaces available in this configuration, yielding 1564 USBInterface instances. 1565 """ 1566 context = self.__context 1567 interface_list = self.__config.interface 1568 for interface_num in xrange(self.getNumInterfaces()): 1569 yield USBInterface(context, interface_list[interface_num]) 1570 1571 # BBB 1572 iterInterfaces = __iter__ 1573 1574 def __getitem__(self, interface): 1575 """ 1576 Returns an USBInterface instance. 1577 """ 1578 if not isinstance(interface, int): 1579 raise TypeError('interface parameter must be an integer') 1580 if not 0 <= interface < self.getNumInterfaces(): 1581 raise IndexError('No such interface: %r' % (interface, )) 1582 return USBInterface(self.__context, self.__config.interface[interface]) 1583 1584class USBInterface(object): 1585 def __init__(self, context, interface): 1586 """ 1587 You should not instanciate this class directly. 1588 Call USBConfiguration methods to get instances of this class. 1589 """ 1590 if not isinstance(interface, libusb1.libusb_interface): 1591 raise TypeError('Unexpected descriptor type.') 1592 self.__interface = interface 1593 self.__context = context 1594 1595 def getNumSettings(self): 1596 return self.__interface.num_altsetting 1597 1598 __len__ = getNumSettings 1599 1600 def __iter__(self): 1601 """ 1602 Iterates over settings in this insterface, yielding 1603 USBInterfaceSetting instances. 1604 """ 1605 context = self.__context 1606 alt_setting_list = self.__interface.altsetting 1607 for alt_setting_num in xrange(self.getNumSettings()): 1608 yield USBInterfaceSetting( 1609 context, alt_setting_list[alt_setting_num]) 1610 1611 # BBB 1612 iterSettings = __iter__ 1613 1614 def __getitem__(self, alt_setting): 1615 """ 1616 Returns an USBInterfaceSetting instance. 1617 """ 1618 if not isinstance(alt_setting, int): 1619 raise TypeError('alt_setting parameter must be an integer') 1620 if not 0 <= alt_setting < self.getNumSettings(): 1621 raise IndexError('No such setting: %r' % (alt_setting, )) 1622 return USBInterfaceSetting( 1623 self.__context, self.__interface.altsetting[alt_setting]) 1624 1625class USBInterfaceSetting(object): 1626 def __init__(self, context, alt_setting): 1627 """ 1628 You should not instanciate this class directly. 1629 Call USBDevice or USBInterface methods to get instances of this class. 1630 """ 1631 if not isinstance(alt_setting, libusb1.libusb_interface_descriptor): 1632 raise TypeError('Unexpected descriptor type.') 1633 self.__alt_setting = alt_setting 1634 self.__context = context 1635 1636 def getNumber(self): 1637 return self.__alt_setting.bInterfaceNumber 1638 1639 def getAlternateSetting(self): 1640 return self.__alt_setting.bAlternateSetting 1641 1642 def getNumEndpoints(self): 1643 return self.__alt_setting.bNumEndpoints 1644 1645 __len__ = getNumEndpoints 1646 1647 def getClass(self): 1648 return self.__alt_setting.bInterfaceClass 1649 1650 def getSubClass(self): 1651 return self.__alt_setting.bInterfaceSubClass 1652 1653 def getClassTuple(self): 1654 """ 1655 For convenience: class and subclass are probably often matched 1656 simultaneously. 1657 """ 1658 alt_setting = self.__alt_setting 1659 return (alt_setting.bInterfaceClass, alt_setting.bInterfaceSubClass) 1660 1661 # BBB 1662 getClassTupple = getClassTuple 1663 1664 def getProtocol(self): 1665 return self.__alt_setting.bInterfaceProtocol 1666 1667 def getDescriptor(self): 1668 return self.__alt_setting.iInterface 1669 1670 def getExtra(self): 1671 return libusb1.get_extra(self.__alt_setting) 1672 1673 def __iter__(self): 1674 """ 1675 Iterates over endpoints in this interface setting , yielding 1676 USBEndpoint instances. 1677 """ 1678 context = self.__context 1679 endpoint_list = self.__alt_setting.endpoint 1680 for endpoint_num in xrange(self.getNumEndpoints()): 1681 yield USBEndpoint(context, endpoint_list[endpoint_num]) 1682 1683 # BBB 1684 iterEndpoints = __iter__ 1685 1686 def __getitem__(self, endpoint): 1687 """ 1688 Returns an USBEndpoint instance. 1689 """ 1690 if not isinstance(endpoint, int): 1691 raise TypeError('endpoint parameter must be an integer') 1692 if not 0 <= endpoint < self.getNumEndpoints(): 1693 raise ValueError('No such endpoint: %r' % (endpoint, )) 1694 return USBEndpoint( 1695 self.__context, self.__alt_setting.endpoint[endpoint]) 1696 1697class USBEndpoint(object): 1698 def __init__(self, context, endpoint): 1699 if not isinstance(endpoint, libusb1.libusb_endpoint_descriptor): 1700 raise TypeError('Unexpected descriptor type.') 1701 self.__endpoint = endpoint 1702 self.__context = context 1703 1704 def getAddress(self): 1705 return self.__endpoint.bEndpointAddress 1706 1707 def getAttributes(self): 1708 return self.__endpoint.bmAttributes 1709 1710 def getMaxPacketSize(self): 1711 return self.__endpoint.wMaxPacketSize 1712 1713 def getInterval(self): 1714 return self.__endpoint.bInterval 1715 1716 def getRefresh(self): 1717 return self.__endpoint.bRefresh 1718 1719 def getSyncAddress(self): 1720 return self.__endpoint.bSynchAddress 1721 1722 def getExtra(self): 1723 return libusb1.get_extra(self.__endpoint) 1724 1725class USBDevice(object): 1726 """ 1727 Represents a USB device. 1728 1729 Exposes USB descriptors which are available from OS without needing to get 1730 a USBDeviceHandle: device descriptor, configuration descriptors, interface 1731 descriptors, setting descritptors, endpoint descriptors. 1732 """ 1733 1734 __configuration_descriptor_list = () 1735 __libusb_unref_device = libusb1.libusb_unref_device 1736 __libusb_free_config_descriptor = libusb1.libusb_free_config_descriptor 1737 __byref = byref 1738 __KeyError = KeyError 1739 1740 def __init__(self, context, device_p, can_load_configuration=True): 1741 """ 1742 You should not instanciate this class directly. 1743 Call USBContext methods to receive instances of this class. 1744 """ 1745 self.__context = context 1746 self.__close_set = WeakSet() 1747 libusb1.libusb_ref_device(device_p) 1748 self.device_p = device_p 1749 # Fetch device descriptor 1750 device_descriptor = libusb1.libusb_device_descriptor() 1751 result = libusb1.libusb_get_device_descriptor( 1752 device_p, byref(device_descriptor)) 1753 mayRaiseUSBError(result) 1754 self.device_descriptor = device_descriptor 1755 if can_load_configuration: 1756 self.__configuration_descriptor_list = descriptor_list = [] 1757 append = descriptor_list.append 1758 device_p = self.device_p 1759 for configuration_id in xrange( 1760 self.device_descriptor.bNumConfigurations): 1761 config = libusb1.libusb_config_descriptor_p() 1762 result = libusb1.libusb_get_config_descriptor( 1763 device_p, configuration_id, byref(config)) 1764 # pylint: disable=undefined-variable 1765 if result == ERROR_NOT_FOUND: 1766 # pylint: enable=undefined-variable 1767 # Some devices (ex windows' root hubs) tell they have 1768 # one configuration, but they have no configuration 1769 # descriptor. 1770 continue 1771 mayRaiseUSBError(result) 1772 append(config.contents) 1773 self.__bus_number = libusb1.libusb_get_bus_number(device_p) 1774 self.__port_number = libusb1.libusb_get_port_number(device_p) 1775 self.__device_address = libusb1.libusb_get_device_address(device_p) 1776 1777 def __del__(self): 1778 self.close() 1779 1780 def close(self): 1781 pop = self.__close_set.pop 1782 while True: 1783 try: 1784 closable = pop() 1785 except self.__KeyError: 1786 break 1787 closable.close() 1788 if not self.device_p: 1789 return 1790 self.__libusb_unref_device(self.device_p) 1791 # pylint: disable=redefined-outer-name 1792 byref = self.__byref 1793 # pylint: enable=redefined-outer-name 1794 descriptor_list = self.__configuration_descriptor_list 1795 while descriptor_list: 1796 self.__libusb_free_config_descriptor( 1797 byref(descriptor_list.pop()), 1798 ) 1799 self.device_p = None 1800 1801 def __str__(self): 1802 return 'Bus %03i Device %03i: ID %04x:%04x' % ( 1803 self.getBusNumber(), 1804 self.getDeviceAddress(), 1805 self.getVendorID(), 1806 self.getProductID(), 1807 ) 1808 1809 def __len__(self): 1810 return len(self.__configuration_descriptor_list) 1811 1812 def __getitem__(self, index): 1813 return USBConfiguration( 1814 self.__context, self.__configuration_descriptor_list[index], self.getDeviceSpeed()) 1815 1816 def __key(self): 1817 return ( 1818 id(self.__context), self.__bus_number, 1819 self.__device_address, self.device_descriptor.idVendor, 1820 self.device_descriptor.idProduct, 1821 ) 1822 1823 def __hash__(self): 1824 return hash(self.__key()) 1825 1826 def __eq__(self, other): 1827 # pylint: disable=unidiomatic-typecheck 1828 return type(self) == type(other) and ( 1829 # pylint: enable=unidiomatic-typecheck 1830 self.device_p == other.device_p or 1831 # pylint: disable=protected-access 1832 self.__key() == other.__key() 1833 # pylint: enable=protected-access 1834 ) 1835 1836 def iterConfigurations(self): 1837 context = self.__context 1838 for config in self.__configuration_descriptor_list: 1839 yield USBConfiguration(context, config, self.getDeviceSpeed()) 1840 1841 # BBB 1842 iterConfiguations = iterConfigurations 1843 1844 def iterSettings(self): 1845 for config in self.iterConfigurations(): 1846 for interface in config: 1847 for setting in interface: 1848 yield setting 1849 1850 def getBusNumber(self): 1851 """ 1852 Get device's bus number. 1853 """ 1854 return self.__bus_number 1855 1856 def getPortNumber(self): 1857 """ 1858 Get device's port number. 1859 """ 1860 return self.__port_number 1861 1862 def getPortNumberList(self): 1863 """ 1864 Get the port number of each hub toward device. 1865 """ 1866 port_list = (c_uint8 * PATH_MAX_DEPTH)() 1867 temp_handle = self.open() 1868 result = libusb1.libusb_get_port_numbers( 1869 self.device_p, port_list, len(port_list)) 1870 temp_handle.close() 1871 mayRaiseUSBError(result) 1872 return list(port_list[:result]) 1873 1874 # TODO: wrap libusb_get_parent when/if libusb removes the need to be inside 1875 # a libusb_(get|free)_device_list block. 1876 1877 def getDeviceAddress(self): 1878 """ 1879 Get device's address on its bus. 1880 """ 1881 return self.__device_address 1882 1883 def getbcdUSB(self): 1884 """ 1885 Get the USB spec version device complies to, in BCD format. 1886 """ 1887 return self.device_descriptor.bcdUSB 1888 1889 def getDeviceClass(self): 1890 """ 1891 Get device's class id. 1892 """ 1893 return self.device_descriptor.bDeviceClass 1894 1895 def getDeviceSubClass(self): 1896 """ 1897 Get device's subclass id. 1898 """ 1899 return self.device_descriptor.bDeviceSubClass 1900 1901 def getDeviceProtocol(self): 1902 """ 1903 Get device's protocol id. 1904 """ 1905 return self.device_descriptor.bDeviceProtocol 1906 1907 def getMaxPacketSize0(self): 1908 """ 1909 Get device's max packet size for endpoint 0 (control). 1910 """ 1911 return self.device_descriptor.bMaxPacketSize0 1912 1913 def getMaxPacketSize(self, endpoint): 1914 """ 1915 Get device's max packet size for given endpoint. 1916 1917 Warning: this function will not always give you the expected result. 1918 See https://libusb.org/ticket/77 . You should instead consult the 1919 endpoint descriptor of current configuration and alternate setting. 1920 """ 1921 result = libusb1.libusb_get_max_packet_size(self.device_p, endpoint) 1922 mayRaiseUSBError(result) 1923 return result 1924 1925 def getMaxISOPacketSize(self, endpoint): 1926 """ 1927 Get the maximum size for a single isochronous packet for given 1928 endpoint. 1929 1930 Warning: this function will not always give you the expected result. 1931 See https://libusb.org/ticket/77 . You should instead consult the 1932 endpoint descriptor of current configuration and alternate setting. 1933 """ 1934 result = libusb1.libusb_get_max_iso_packet_size(self.device_p, endpoint) 1935 mayRaiseUSBError(result) 1936 return result 1937 1938 def getVendorID(self): 1939 """ 1940 Get device's vendor id. 1941 """ 1942 return self.device_descriptor.idVendor 1943 1944 def getProductID(self): 1945 """ 1946 Get device's product id. 1947 """ 1948 return self.device_descriptor.idProduct 1949 1950 def getbcdDevice(self): 1951 """ 1952 Get device's release number. 1953 """ 1954 return self.device_descriptor.bcdDevice 1955 1956 def getSupportedLanguageList(self): 1957 """ 1958 Get the list of language ids device has string descriptors for. 1959 Note: opens the device temporarily and uses synchronous API. 1960 """ 1961 return self.open().getSupportedLanguageList() 1962 1963 def getManufacturer(self): 1964 """ 1965 Get device's manufaturer name. 1966 1967 Shortcut for .open().getManufacturer() . 1968 """ 1969 return self.open().getManufacturer() 1970 1971 def getManufacturerDescriptor(self): 1972 """ 1973 Get the string index of device's manufacturer. 1974 You can pass this value to USBHandle.getASCIIStringDescriptor to get 1975 the actual manufacturer string. 1976 """ 1977 return self.device_descriptor.iManufacturer 1978 1979 def getProduct(self): 1980 """ 1981 Get device's product name. 1982 1983 Shortcut for .open().getProduct() . 1984 """ 1985 return self.open().getProduct() 1986 1987 def getProductDescriptor(self): 1988 """ 1989 Get the string index of device's product name. 1990 You can pass this value to USBHandle.getASCIIStringDescriptor to get 1991 the actual product name string. 1992 """ 1993 return self.device_descriptor.iProduct 1994 1995 def getSerialNumber(self): 1996 """ 1997 Get device's serial number. 1998 1999 Shortcut for .open().getSerialNumber() . 2000 """ 2001 return self.open().getSerialNumber() 2002 2003 def getSerialNumberDescriptor(self): 2004 """ 2005 Get the string index of device's serial number. 2006 You can pass this value to USBHandle.getASCIIStringDescriptor to get 2007 the actual serial number string. 2008 """ 2009 return self.device_descriptor.iSerialNumber 2010 2011 def getNumConfigurations(self): 2012 """ 2013 Get device's number of possible configurations. 2014 """ 2015 return self.device_descriptor.bNumConfigurations 2016 2017 def getDeviceSpeed(self): 2018 """ 2019 Get device's speed. 2020 2021 Returns one of: 2022 SPEED_UNKNOWN 2023 SPEED_LOW 2024 SPEED_FULL 2025 SPEED_HIGH 2026 SPEED_SUPER 2027 """ 2028 return libusb1.libusb_get_device_speed(self.device_p) 2029 2030 def open(self): 2031 """ 2032 Open device. 2033 Returns an USBDeviceHandle instance. 2034 """ 2035 handle = libusb1.libusb_device_handle_p() 2036 mayRaiseUSBError(libusb1.libusb_open(self.device_p, byref(handle))) 2037 result = USBDeviceHandle(self.__context, handle, self) 2038 self.__close_set.add(result) 2039 return result 2040 2041_zero_tv = libusb1.timeval(0, 0) 2042_zero_tv_p = byref(_zero_tv) 2043 2044class USBContext(object): 2045 """ 2046 libusb1 USB context. 2047 2048 Provides methods to enumerate & look up USB devices. 2049 Also provides access to global (device-independent) libusb1 functions. 2050 """ 2051 __libusb_exit = libusb1.libusb_exit 2052 __context_p = None 2053 __added_cb = None 2054 __removed_cb = None 2055 __poll_cb_user_data = None 2056 __libusb_set_pollfd_notifiers = libusb1.libusb_set_pollfd_notifiers 2057 __null_pointer = c_void_p() 2058 __KeyError = KeyError 2059 __cast = staticmethod(cast) 2060 __auto_open = True 2061 2062 # pylint: disable=no-self-argument,protected-access 2063 def _validContext(func): 2064 # Defined inside USBContext so we can access "self.__*". 2065 @contextlib.contextmanager 2066 def refcount(self): 2067 with self.__context_cond: 2068 if not self.__context_p and self.__auto_open: 2069 # BBB 2070 warnings.warn( 2071 'Use "with USBContext() as context:" for safer cleanup' 2072 ' on interpreter shutdown. See also USBContext.open().', 2073 DeprecationWarning, 2074 ) 2075 self.open() 2076 self.__context_refcount += 1 2077 try: 2078 yield 2079 finally: 2080 with self.__context_cond: 2081 self.__context_refcount -= 1 2082 if not self.__context_refcount: 2083 self.__context_cond.notify_all() 2084 if inspect.isgeneratorfunction(func): 2085 def wrapper(self, *args, **kw): 2086 with refcount(self): 2087 if self.__context_p: 2088 # pylint: disable=not-callable 2089 for value in func(self, *args, **kw): 2090 # pylint: enable=not-callable 2091 yield value 2092 else: 2093 def wrapper(self, *args, **kw): 2094 with refcount(self): 2095 if self.__context_p: 2096 # pylint: disable=not-callable 2097 return func(self, *args, **kw) 2098 # pylint: enable=not-callable 2099 functools.update_wrapper(wrapper, func) 2100 return wrapper 2101 # pylint: enable=no-self-argument,protected-access 2102 2103 def __init__(self): 2104 """ 2105 Create a new USB context. 2106 """ 2107 # Used to prevent an exit to cause a segfault if a concurrent thread 2108 # is still in libusb. 2109 self.__context_refcount = 0 2110 self.__context_cond = threading.Condition() 2111 self.__context_p = libusb1.libusb_context_p() 2112 self.__hotplug_callback_dict = {} 2113 self.__close_set = WeakSet() 2114 2115 def __del__(self): 2116 # Avoid locking. 2117 # XXX: Assumes __del__ should not normally be called while any 2118 # instance's method is being executed. It seems unlikely (they hold a 2119 # reference to their instance). 2120 self._exit() 2121 2122 def __enter__(self): 2123 return self.open() 2124 2125 def __exit__(self, exc_type, exc_val, exc_tb): 2126 self.close() 2127 2128 def open(self): 2129 """ 2130 Finish context initialisation, as is normally done in __enter__ . 2131 2132 This happens automatically on the first method call needing access to 2133 the uninitialised properties, but with a warning. 2134 Call this method ONLY if your usage pattern prevents you from using the 2135 with USBContext() as contewt: 2136 form: this means there are ways to avoid calling close(), which can 2137 cause issues particularly hard to debug (ex: interpreter hangs on 2138 exit). 2139 """ 2140 assert self.__context_refcount == 0 2141 mayRaiseUSBError(libusb1.libusb_init(byref(self.__context_p))) 2142 return self 2143 2144 def close(self): 2145 """ 2146 Close (destroy) this USB context, and all related instances. 2147 2148 When this method has been called, methods on its instance will 2149 become mosty no-ops, returning None until explicitly re-opened 2150 (by calling open() or __enter__()). 2151 2152 Note: "exit" is a deprecated alias of "close". 2153 """ 2154 self.__auto_open = False 2155 self.__context_cond.acquire() 2156 try: 2157 while self.__context_refcount and self.__context_p: 2158 self.__context_cond.wait() 2159 self._exit() 2160 finally: 2161 self.__context_cond.notify_all() 2162 self.__context_cond.release() 2163 2164 def _exit(self): 2165 context_p = self.__context_p 2166 if context_p: 2167 while self.__hotplug_callback_dict: 2168 handle = next(iter(self.__hotplug_callback_dict)) 2169 self.hotplugDeregisterCallback(handle) 2170 pop = self.__close_set.pop 2171 while True: 2172 try: 2173 closable = pop() 2174 except self.__KeyError: 2175 break 2176 closable.close() 2177 self.__libusb_exit(context_p) 2178 self.__context_p = libusb1.libusb_context_p() 2179 self.__added_cb = self.__null_pointer 2180 self.__removed_cb = self.__null_pointer 2181 2182 # BBB 2183 exit = close 2184 2185 @_validContext 2186 def getDeviceIterator(self, skip_on_error=False): 2187 """ 2188 Return an iterator over all USB devices currently plugged in, as USBDevice 2189 instances. 2190 2191 skip_on_error (bool) 2192 If True, ignore devices which raise USBError. 2193 """ 2194 device_p_p = libusb1.libusb_device_p_p() 2195 libusb_device_p = libusb1.libusb_device_p 2196 device_list_len = libusb1.libusb_get_device_list(self.__context_p, 2197 byref(device_p_p)) 2198 mayRaiseUSBError(device_list_len) 2199 try: 2200 for device_p in device_p_p[:device_list_len]: 2201 try: 2202 # Instanciate our own libusb_device_p object so we can free 2203 # libusb-provided device list. Is this a bug in ctypes that 2204 # it doesn't copy pointer value (=pointed memory address) ? 2205 # At least, it's not so convenient and forces using such 2206 # weird code. 2207 device = USBDevice(self, libusb_device_p(device_p.contents)) 2208 except USBError: 2209 if not skip_on_error: 2210 raise 2211 else: 2212 self.__close_set.add(device) 2213 yield device 2214 finally: 2215 libusb1.libusb_free_device_list(device_p_p, 1) 2216 2217 def getDeviceList(self, skip_on_access_error=False, skip_on_error=False): 2218 """ 2219 Return a list of all USB devices currently plugged in, as USBDevice 2220 instances. 2221 2222 skip_on_error (bool) 2223 If True, ignore devices which raise USBError. 2224 2225 skip_on_access_error (bool) 2226 DEPRECATED. Alias for skip_on_error. 2227 """ 2228 return list( 2229 self.getDeviceIterator( 2230 skip_on_error=skip_on_access_error or skip_on_error, 2231 ), 2232 ) 2233 2234 def getByVendorIDAndProductID( 2235 self, vendor_id, product_id, 2236 skip_on_access_error=False, skip_on_error=False): 2237 """ 2238 Get the first USB device matching given vendor and product ids. 2239 Returns an USBDevice instance, or None if no present device match. 2240 skip_on_error (bool) 2241 (see getDeviceList) 2242 skip_on_access_error (bool) 2243 (see getDeviceList) 2244 """ 2245 for device in self.getDeviceIterator( 2246 skip_on_error=skip_on_access_error or skip_on_error, 2247 ): 2248 if device.getVendorID() == vendor_id and \ 2249 device.getProductID() == product_id: 2250 return device 2251 2252 def openByVendorIDAndProductID( 2253 self, vendor_id, product_id, 2254 skip_on_access_error=False, skip_on_error=False): 2255 """ 2256 Get the first USB device matching given vendor and product ids. 2257 Returns an USBDeviceHandle instance, or None if no present device 2258 match. 2259 skip_on_error (bool) 2260 (see getDeviceList) 2261 skip_on_access_error (bool) 2262 (see getDeviceList) 2263 """ 2264 result = self.getByVendorIDAndProductID( 2265 vendor_id, product_id, 2266 skip_on_access_error=skip_on_access_error, 2267 skip_on_error=skip_on_error) 2268 if result is not None: 2269 return result.open() 2270 2271 @_validContext 2272 def getPollFDList(self): 2273 """ 2274 Return file descriptors to be used to poll USB events. 2275 You should not have to call this method, unless you are integrating 2276 this class with a polling mechanism. 2277 """ 2278 pollfd_p_p = libusb1.libusb_get_pollfds(self.__context_p) 2279 if not pollfd_p_p: 2280 errno = get_errno() 2281 if errno: 2282 raise OSError(errno) 2283 else: 2284 # Assume not implemented 2285 raise NotImplementedError( 2286 'Your libusb does not seem to implement pollable FDs') 2287 try: 2288 result = [] 2289 append = result.append 2290 fd_index = 0 2291 while pollfd_p_p[fd_index]: 2292 append(( 2293 pollfd_p_p[fd_index].contents.fd, 2294 pollfd_p_p[fd_index].contents.events, 2295 )) 2296 fd_index += 1 2297 finally: 2298 libusb1.libusb_free_pollfds(pollfd_p_p) 2299 return result 2300 2301 @_validContext 2302 def handleEvents(self): 2303 """ 2304 Handle any pending event (blocking). 2305 See libusb1 documentation for details (there is a timeout, so it's 2306 not "really" blocking). 2307 """ 2308 mayRaiseUSBError( 2309 libusb1.libusb_handle_events(self.__context_p), 2310 ) 2311 2312 # TODO: handleEventsCompleted 2313 2314 @_validContext 2315 def handleEventsTimeout(self, tv=0): 2316 """ 2317 Handle any pending event. 2318 If tv is 0, will return immediately after handling already-pending 2319 events. 2320 Otherwise, defines the maximum amount of time to wait for events, in 2321 seconds. 2322 """ 2323 if tv is None: 2324 tv = 0 2325 tv_s = int(tv) 2326 real_tv = libusb1.timeval(tv_s, int((tv - tv_s) * 1000000)) 2327 mayRaiseUSBError( 2328 libusb1.libusb_handle_events_timeout( 2329 self.__context_p, byref(real_tv), 2330 ), 2331 ) 2332 2333 # TODO: handleEventsTimeoutCompleted 2334 2335 @_validContext 2336 def setPollFDNotifiers( 2337 self, added_cb=None, removed_cb=None, user_data=None): 2338 """ 2339 Give libusb1 methods to call when it should add/remove file descriptor 2340 for polling. 2341 You should not have to call this method, unless you are integrating 2342 this class with a polling mechanism. 2343 """ 2344 if added_cb is None: 2345 added_cb = self.__null_pointer 2346 else: 2347 added_cb = libusb1.libusb_pollfd_added_cb_p(added_cb) 2348 if removed_cb is None: 2349 removed_cb = self.__null_pointer 2350 else: 2351 removed_cb = libusb1.libusb_pollfd_removed_cb_p(removed_cb) 2352 if user_data is None: 2353 user_data = self.__null_pointer 2354 self.__added_cb = added_cb 2355 self.__removed_cb = removed_cb 2356 self.__poll_cb_user_data = user_data 2357 self.__libusb_set_pollfd_notifiers( 2358 self.__context_p, 2359 self.__cast(added_cb, libusb1.libusb_pollfd_added_cb_p), 2360 self.__cast(removed_cb, libusb1.libusb_pollfd_removed_cb_p), 2361 user_data, 2362 ) 2363 2364 @_validContext 2365 def getNextTimeout(self): 2366 """ 2367 Returns the next internal timeout that libusb needs to handle, in 2368 seconds, or None if no timeout is needed. 2369 You should not have to call this method, unless you are integrating 2370 this class with a polling mechanism. 2371 """ 2372 timeval = libusb1.timeval() 2373 result = libusb1.libusb_get_next_timeout( 2374 self.__context_p, byref(timeval)) 2375 if result == 0: 2376 return None 2377 elif result == 1: 2378 return timeval.tv_sec + (timeval.tv_usec * 0.000001) 2379 raiseUSBError(result) 2380 2381 @_validContext 2382 def setDebug(self, level): 2383 """ 2384 Set debugging level. See LOG_LEVEL_* constants. 2385 Note: depending on libusb compilation settings, this might have no 2386 effect. 2387 """ 2388 libusb1.libusb_set_debug(self.__context_p, level) 2389 2390 @_validContext 2391 def tryLockEvents(self): 2392 warnings.warn( 2393 'You may not be able to unlock in the event of USBContext exit. ' 2394 'Consider looping over handleEvents() in a thread.', 2395 DeprecationWarning, 2396 ) 2397 return libusb1.libusb_try_lock_events(self.__context_p) 2398 2399 @_validContext 2400 def lockEvents(self): 2401 warnings.warn( 2402 'You may not be able to unlock in the event of USBContext exit. ' 2403 'Consider looping over handleEvents() in a thread.', 2404 DeprecationWarning, 2405 ) 2406 libusb1.libusb_lock_events(self.__context_p) 2407 2408 @_validContext 2409 def lockEventWaiters(self): 2410 warnings.warn( 2411 'You may not be able to unlock in the event of USBContext exit. ' 2412 'Consider looping over handleEvents() in a thread.', 2413 DeprecationWarning, 2414 ) 2415 libusb1.libusb_lock_event_waiters(self.__context_p) 2416 2417 @_validContext 2418 def waitForEvent(self, tv=0): 2419 warnings.warn( 2420 'Consider looping over handleEvents() in a thread.', 2421 DeprecationWarning, 2422 ) 2423 if tv is None: 2424 tv = 0 2425 tv_s = int(tv) 2426 real_tv = libusb1.timeval(tv_s, int((tv - tv_s) * 1000000)) 2427 libusb1.libusb_wait_for_event(self.__context_p, byref(real_tv)) 2428 2429 @_validContext 2430 def unlockEventWaiters(self): 2431 warnings.warn( 2432 'This method will lock in the event of USBContext exit, ' 2433 'preventing libusb lock release. ' 2434 'Consider looping over handleEvents() in a thread.', 2435 DeprecationWarning, 2436 ) 2437 libusb1.libusb_unlock_event_waiters(self.__context_p) 2438 2439 @_validContext 2440 def eventHandlingOK(self): 2441 warnings.warn( 2442 'Consider looping over handleEvents() in a thread.', 2443 DeprecationWarning, 2444 ) 2445 return libusb1.libusb_event_handling_ok(self.__context_p) 2446 2447 @_validContext 2448 def unlockEvents(self): 2449 warnings.warn( 2450 'This method will lock in the event of USBContext exit, ' 2451 'preventing libusb lock release. ' 2452 'Consider looping over handleEvents() in a thread.', 2453 DeprecationWarning, 2454 ) 2455 libusb1.libusb_unlock_events(self.__context_p) 2456 2457 @_validContext 2458 def handleEventsLocked(self): 2459 warnings.warn( 2460 'Consider looping over handleEvents() in a thread.', 2461 DeprecationWarning, 2462 ) 2463 # XXX: does tv parameter need to be exposed ? 2464 mayRaiseUSBError(libusb1.libusb_handle_events_locked( 2465 self.__context_p, _zero_tv_p, 2466 )) 2467 2468 @_validContext 2469 def eventHandlerActive(self): 2470 warnings.warn( 2471 'Consider looping over handleEvents() in a thread.', 2472 DeprecationWarning, 2473 ) 2474 return libusb1.libusb_event_handler_active(self.__context_p) 2475 2476 @staticmethod 2477 def hasCapability(capability): 2478 """ 2479 Backward-compatibility alias for module-level hasCapability. 2480 """ 2481 return hasCapability(capability) 2482 2483 @_validContext 2484 def hotplugRegisterCallback( 2485 self, callback, 2486 # pylint: disable=undefined-variable 2487 events=HOTPLUG_EVENT_DEVICE_ARRIVED | HOTPLUG_EVENT_DEVICE_LEFT, 2488 flags=HOTPLUG_ENUMERATE, 2489 vendor_id=HOTPLUG_MATCH_ANY, 2490 product_id=HOTPLUG_MATCH_ANY, 2491 dev_class=HOTPLUG_MATCH_ANY, 2492 # pylint: enable=undefined-variable 2493 ): 2494 """ 2495 Registers an hotplug callback. 2496 On success, returns an opaque value which can be passed to 2497 hotplugDeregisterCallback. 2498 Callback must accept the following positional arguments: 2499 - this USBContext instance 2500 - an USBDevice instance 2501 If device has left, configuration descriptors may not be 2502 available. Its device descriptor will be available. 2503 - event type, one of: 2504 HOTPLUG_EVENT_DEVICE_ARRIVED 2505 HOTPLUG_EVENT_DEVICE_LEFT 2506 Callback must return whether it must be unregistered (any true value 2507 to be unregistered, any false value to be kept registered). 2508 2509 Note: given callback will be invoked during event handling, meaning 2510 it cannot call any synchronous libusb function. 2511 """ 2512 def wrapped_callback(context_p, device_p, event, _): 2513 assert addressof(context_p.contents) == addressof( 2514 self.__context_p.contents), (context_p, self.__context_p) 2515 device = USBDevice( 2516 self, 2517 device_p, 2518 # pylint: disable=undefined-variable 2519 event != HOTPLUG_EVENT_DEVICE_LEFT, 2520 # pylint: enable=undefined-variable 2521 ) 2522 self.__close_set.add(device) 2523 unregister = bool(callback( 2524 self, 2525 device, 2526 event, 2527 )) 2528 if unregister: 2529 del self.__hotplug_callback_dict[handle] 2530 return unregister 2531 handle = c_int() 2532 callback_p = libusb1.libusb_hotplug_callback_fn_p(wrapped_callback) 2533 mayRaiseUSBError(libusb1.libusb_hotplug_register_callback( 2534 self.__context_p, events, flags, vendor_id, product_id, dev_class, 2535 callback_p, None, byref(handle), 2536 )) 2537 handle = handle.value 2538 # Keep strong references 2539 assert handle not in self.__hotplug_callback_dict, ( 2540 handle, 2541 self.__hotplug_callback_dict, 2542 ) 2543 self.__hotplug_callback_dict[handle] = (callback_p, wrapped_callback) 2544 return handle 2545 2546 @_validContext 2547 def hotplugDeregisterCallback(self, handle): 2548 """ 2549 Deregisters an hotplug callback. 2550 handle (opaque) 2551 Return value of a former hotplugRegisterCallback call. 2552 """ 2553 del self.__hotplug_callback_dict[handle] 2554 libusb1.libusb_hotplug_deregister_callback(self.__context_p, handle) 2555 2556del USBContext._validContext 2557 2558def getVersion(): 2559 """ 2560 Returns underlying libusb's version information as a 6-namedtuple (or 2561 6-tuple if namedtuples are not avaiable): 2562 - major 2563 - minor 2564 - micro 2565 - nano 2566 - rc 2567 - describe 2568 Returns (0, 0, 0, 0, '', '') if libusb doesn't have required entry point. 2569 """ 2570 version = libusb1.libusb_get_version().contents 2571 return Version( 2572 version.major, 2573 version.minor, 2574 version.micro, 2575 version.nano, 2576 version.rc, 2577 version.describe, 2578 ) 2579 2580def hasCapability(capability): 2581 """ 2582 Tests feature presence. 2583 2584 capability should be one of: 2585 CAP_HAS_CAPABILITY 2586 CAP_HAS_HOTPLUG 2587 CAP_HAS_HID_ACCESS 2588 CAP_SUPPORTS_DETACH_KERNEL_DRIVER 2589 """ 2590 return libusb1.libusb_has_capability(capability) 2591 2592class LibUSBContext(USBContext): 2593 """ 2594 Backward-compatibility alias for USBContext. 2595 """ 2596 def __init__(self): 2597 warnings.warn( 2598 'LibUSBContext is being renamed to USBContext', 2599 DeprecationWarning, 2600 ) 2601 super(LibUSBContext, self).__init__() 2602 2603from ._version import get_versions 2604__version__ = get_versions()['version'] 2605del get_versions 2606