1# -*- coding: utf-8 -*- 2"""High level wrapper for a Resource. 3 4This file is part of PyVISA. 5 6:copyright: 2014-2020 by PyVISA Authors, see AUTHORS for more details. 7:license: MIT, see LICENSE for more details. 8 9""" 10import contextlib 11import time 12import warnings 13from functools import update_wrapper 14from typing import ( 15 Any, 16 Callable, 17 ContextManager, 18 Iterator, 19 Optional, 20 Set, 21 Type, 22 TypeVar, 23 Union, 24 cast, 25) 26 27from typing_extensions import ClassVar, Literal 28 29from .. import attributes, constants, errors, highlevel, logger, rname, typing, util 30from ..attributes import Attribute 31from ..events import Event 32from ..typing import VISAEventContext, VISAHandler, VISASession 33 34 35class WaitResponse: 36 """Class used in return of wait_on_event. 37 38 It properly closes the context upon delete. 39 40 A call with event_type of 0 (normally used when timed_out is True) will store 41 None as the event and event type, otherwise it records the proper Event. 42 43 """ 44 45 #: Reference to the event object that was waited for. 46 event: Event 47 48 #: Status code returned by the VISA library 49 ret: constants.StatusCode 50 51 #: Did a timeout occurs 52 timed_out: bool 53 54 def __init__( 55 self, 56 event_type: constants.EventType, 57 context: Optional[VISAEventContext], 58 ret: constants.StatusCode, 59 visalib: highlevel.VisaLibraryBase, 60 timed_out: bool = False, 61 ): 62 self.event = Event(visalib, event_type, context) 63 self._event_type = constants.EventType(event_type) 64 self._context = context 65 self.ret = ret 66 self._visalib = visalib 67 self.timed_out = timed_out 68 69 @property 70 def event_type(self) -> Optional[constants.EventType]: 71 warnings.warn( 72 "event_type is deprecated and will be removed in 1.12. " 73 "Use the event object instead.", 74 FutureWarning, 75 ) 76 return self._event_type 77 78 @property 79 def context(self) -> Optional[VISAEventContext]: 80 warnings.warn( 81 "context is deprecated and will be removed in 1.12. " 82 "Use the event object instead to access the event attributes.", 83 FutureWarning, 84 ) 85 return self._context 86 87 def __del__(self) -> None: 88 if self.event._context is not None: 89 try: 90 self._visalib.close(self.event._context) 91 self.event.close() 92 except errors.VisaIOError: 93 pass 94 95 96T = TypeVar("T", bound="Resource") 97 98 99class Resource(object): 100 """Base class for resources. 101 102 Do not instantiate directly, use 103 :meth:`pyvisa.highlevel.ResourceManager.open_resource`. 104 105 """ 106 107 #: Reference to the resource manager used by this resource 108 resource_manager: highlevel.ResourceManager 109 110 #: Reference to the VISA library instance used by the resource 111 visalib: highlevel.VisaLibraryBase 112 113 #: VISA attribute descriptor classes that can be used to introspect the 114 #: supported attributes and the possible values. The "often used" ones 115 #: are generally directly available on the resource. 116 visa_attributes_classes: ClassVar[Set[Type[attributes.Attribute]]] 117 118 @classmethod 119 def register( 120 cls, interface_type: constants.InterfaceType, resource_class: str 121 ) -> Callable[[Type[T]], Type[T]]: 122 """Create a decorator to register a class. 123 124 The class is associated to an interface type, resource class pair. 125 126 Parameters 127 ---------- 128 interface_type : constants.InterfaceType 129 Interface type for which to register a wrapper class. 130 resource_class : str 131 Resource class for which to register a wrapper class. 132 133 Returns 134 ------- 135 Callable[[Type[T]], Type[T]] 136 Decorator registering the class. Raises TypeError if some VISA 137 attributes are missing on the registered class. 138 139 """ 140 141 def _internal(python_class): 142 143 highlevel.ResourceManager.register_resource_class( 144 interface_type, resource_class, python_class 145 ) 146 147 return python_class 148 149 return _internal 150 151 def __init__( 152 self, resource_manager: highlevel.ResourceManager, resource_name: str 153 ) -> None: 154 self._resource_manager = resource_manager 155 self.visalib = self._resource_manager.visalib 156 157 # We store the resource name and use preferably the private attr over 158 # the public descriptor internally because the public descriptor 159 # requires a live instance the VISA library, which means it is much 160 # slower but also can cause issue in error reporting when accessing the 161 # repr 162 self._resource_name: str 163 try: 164 # Attempt to normalize the resource name. Can fail for aliases 165 self._resource_name = str(rname.ResourceName.from_string(resource_name)) 166 except rname.InvalidResourceName: 167 self._resource_name = resource_name 168 169 self._logging_extra = { 170 "library_path": self.visalib.library_path, 171 "resource_manager.session": self._resource_manager.session, 172 "resource_name": self._resource_name, 173 "session": None, 174 } 175 176 #: Session handle. 177 self._session: Optional[VISASession] = None 178 179 @property 180 def session(self) -> VISASession: 181 """Resource session handle. 182 183 Raises 184 ------ 185 errors.InvalidSession 186 Raised if session is closed. 187 188 """ 189 if self._session is None: 190 raise errors.InvalidSession() 191 return self._session 192 193 @session.setter 194 def session(self, value: Optional[VISASession]) -> None: 195 self._session = value 196 197 def __del__(self) -> None: 198 if self._session is not None: 199 self.close() 200 201 def __str__(self) -> str: 202 return "%s at %s" % (self.__class__.__name__, self._resource_name) 203 204 def __repr__(self) -> str: 205 return "<%r(%r)>" % (self.__class__.__name__, self._resource_name) 206 207 def __enter__(self) -> "Resource": 208 return self 209 210 def __exit__(self, *args) -> None: 211 self.close() 212 213 @property 214 def last_status(self) -> constants.StatusCode: 215 """Last status code for this session.""" 216 return self.visalib.get_last_status_in_session(self.session) 217 218 @property 219 def resource_info(self) -> highlevel.ResourceInfo: 220 """Get the extended information of this resource.""" 221 return self.visalib.parse_resource_extended( 222 self._resource_manager.session, self._resource_name 223 )[0] 224 225 # --- VISA attributes -------------------------------------------------------------- 226 227 #: VISA attributes require the resource to be opened in order to get accessed. 228 #: Please have a look at the attributes definition for more details 229 230 #: Interface type of the given session. 231 interface_type: Attribute[ 232 constants.InterfaceType 233 ] = attributes.AttrVI_ATTR_INTF_TYPE() 234 235 #: Board number for the given interface. 236 interface_number: Attribute[int] = attributes.AttrVI_ATTR_INTF_NUM() 237 238 #: Resource class (for example, "INSTR") as defined by the canonical resource name. 239 resource_class: Attribute[str] = attributes.AttrVI_ATTR_RSRC_CLASS() 240 241 #: Unique identifier for a resource compliant with the address structure. 242 resource_name: Attribute[str] = attributes.AttrVI_ATTR_RSRC_NAME() 243 244 #: Resource version that identifies the revisions or implementations of a resource. 245 implementation_version: Attribute[int] = attributes.AttrVI_ATTR_RSRC_IMPL_VERSION() 246 247 #: Current locking state of the resource. 248 lock_state: Attribute[ 249 constants.AccessModes 250 ] = attributes.AttrVI_ATTR_RSRC_LOCK_STATE() 251 252 #: Version of the VISA specification to which the implementation is compliant. 253 spec_version: Attribute[int] = attributes.AttrVI_ATTR_RSRC_SPEC_VERSION() 254 255 #: Manufacturer name of the vendor that implemented the VISA library. 256 resource_manufacturer_name: Attribute[str] = attributes.AttrVI_ATTR_RSRC_MANF_NAME() 257 258 #: Timeout in milliseconds for all resource I/O operations. 259 timeout: Attribute[float] = attributes.AttrVI_ATTR_TMO_VALUE() 260 261 def ignore_warning( 262 self, *warnings_constants: constants.StatusCode 263 ) -> ContextManager: 264 """Ignoring warnings context manager for the current resource. 265 266 Parameters 267 ---------- 268 warnings_constants : constants.StatusCode 269 Constants identifying the warnings to ignore. 270 271 """ 272 return self.visalib.ignore_warning(self.session, *warnings_constants) 273 274 def open( 275 self, 276 access_mode: constants.AccessModes = constants.AccessModes.no_lock, 277 open_timeout: int = 5000, 278 ) -> None: 279 """Opens a session to the specified resource. 280 281 Parameters 282 ---------- 283 access_mode : constants.AccessModes, optional 284 Specifies the mode by which the resource is to be accessed. 285 Defaults to constants.AccessModes.no_lock. 286 open_timeout : int, optional 287 If the ``access_mode`` parameter requests a lock, then this parameter 288 specifies the absolute time period (in milliseconds) that the 289 resource waits to get unlocked before this operation returns an error. 290 Defaults to 5000. 291 292 """ 293 logger.debug("%s - opening ...", self._resource_name, extra=self._logging_extra) 294 with self._resource_manager.ignore_warning( 295 constants.StatusCode.success_device_not_present 296 ): 297 self.session, status = self._resource_manager.open_bare_resource( 298 self._resource_name, access_mode, open_timeout 299 ) 300 301 if status == constants.StatusCode.success_device_not_present: 302 # The device was not ready when we opened the session. 303 # Now it gets five seconds more to become ready. 304 # Every 0.1 seconds we probe it with viClear. 305 start_time = time.time() 306 sleep_time = 0.1 307 try_time = 5 308 while time.time() - start_time < try_time: 309 time.sleep(sleep_time) 310 try: 311 self.clear() 312 break 313 except errors.VisaIOError as error: 314 if error.error_code != constants.StatusCode.error_no_listeners: 315 raise 316 317 self._logging_extra["session"] = self.session 318 logger.debug( 319 "%s - is open with session %s", 320 self._resource_name, 321 self.session, 322 extra=self._logging_extra, 323 ) 324 325 def before_close(self) -> None: 326 """Called just before closing an instrument.""" 327 self.__switch_events_off() 328 329 def close(self) -> None: 330 """Closes the VISA session and marks the handle as invalid.""" 331 try: 332 logger.debug("%s - closing", self._resource_name, extra=self._logging_extra) 333 self.before_close() 334 self.visalib.close(self.session) 335 logger.debug( 336 "%s - is closed", self._resource_name, extra=self._logging_extra 337 ) 338 # Mypy is confused by the idea that we can set a value we cannot get 339 self.session = None # type: ignore 340 except errors.InvalidSession: 341 pass 342 343 def __switch_events_off(self) -> None: 344 """Switch off and discrads all events.""" 345 self.disable_event( 346 constants.EventType.all_enabled, constants.EventMechanism.all 347 ) 348 self.discard_events( 349 constants.EventType.all_enabled, constants.EventMechanism.all 350 ) 351 self.visalib.uninstall_all_visa_handlers(self.session) 352 353 def get_visa_attribute(self, name: constants.ResourceAttribute) -> Any: 354 """Retrieves the state of an attribute in this resource. 355 356 One should prefer the dedicated descriptor for often used attributes 357 since those perform checks and automatic conversion on the value. 358 359 Parameters 360 ---------- 361 name : constants.ResourceAttribute 362 Resource attribute for which the state query is made. 363 364 Returns 365 ------- 366 Any 367 The state of the queried attribute for a specified resource. 368 369 """ 370 return self.visalib.get_attribute(self.session, name)[0] 371 372 def set_visa_attribute( 373 self, name: constants.ResourceAttribute, state: Any 374 ) -> constants.StatusCode: 375 """Set the state of an attribute. 376 377 One should prefer the dedicated descriptor for often used attributes 378 since those perform checks and automatic conversion on the value. 379 380 Parameters 381 ---------- 382 name : constants.ResourceAttribute 383 Attribute for which the state is to be modified. 384 state : Any 385 The state of the attribute to be set for the specified object. 386 387 Returns 388 ------- 389 constants.StatusCode 390 Return value of the library call. 391 392 """ 393 return self.visalib.set_attribute(self.session, name, state) 394 395 def clear(self) -> None: 396 """Clear this resource.""" 397 self.visalib.clear(self.session) 398 399 def install_handler( 400 self, event_type: constants.EventType, handler: VISAHandler, user_handle=None 401 ) -> Any: 402 """Install handlers for event callbacks in this resource. 403 404 Parameters 405 ---------- 406 event_type : constants.EventType 407 Logical event identifier. 408 handler : VISAHandler 409 Handler function to be installed by a client application. 410 user_handle : 411 A value specified by an application that can be used for identifying 412 handlers uniquely for an event type. Depending on the backend they 413 may be restriction on the possible values. Look at the backend 414 `install_visa_handler` for more details. 415 416 Returns 417 ------- 418 Any 419 User handle in a format amenable to the backend. This is this 420 representation of the handle that should be used when unistalling 421 a handler. 422 423 """ 424 return self.visalib.install_visa_handler( 425 self.session, event_type, handler, user_handle 426 ) 427 428 def wrap_handler( 429 self, callable: Callable[["Resource", Event, Any], None] 430 ) -> VISAHandler: 431 """Wrap an event handler to provide the signature expected by VISA. 432 433 The handler is expected to have the following signature: 434 handler(resource: Resource, event: Event, user_handle: Any) -> None. 435 436 The wrapped handler should be used only to handle events on the resource 437 used to wrap the handler. 438 439 """ 440 441 def event_handler( 442 session: VISASession, 443 event_type: constants.EventType, 444 event_context: typing.VISAEventContext, 445 user_handle: Any, 446 ) -> None: 447 if session != self.session: 448 raise RuntimeError( 449 "When wrapping a handler, the resource used to wrap the handler" 450 "must be the same on which the handler will be installed." 451 f"Wrapping session: {self.session}, event on session: {session}" 452 ) 453 event = Event(self.visalib, event_type, event_context) 454 try: 455 return callable(self, event, user_handle) 456 finally: 457 event.close() 458 459 update_wrapper(event_handler, callable) 460 461 return event_handler 462 463 def uninstall_handler( 464 self, event_type: constants.EventType, handler: VISAHandler, user_handle=None 465 ) -> None: 466 """Uninstalls handlers for events in this resource. 467 468 Parameters 469 ---------- 470 event_type : constants.EventType 471 Logical event identifier. 472 handler : VISAHandler 473 Handler function to be uninstalled by a client application. 474 user_handle : Any 475 The user handle returned by install_handler. 476 477 """ 478 self.visalib.uninstall_visa_handler( 479 self.session, event_type, handler, user_handle 480 ) 481 482 def disable_event( 483 self, event_type: constants.EventType, mechanism: constants.EventMechanism 484 ) -> None: 485 """Disable notification for an event type(s) via the specified mechanism(s). 486 487 Parameters 488 ---------- 489 event_type : constants.EventType 490 Logical event identifier. 491 mechanism : constants.EventMechanism 492 Specifies event handling mechanisms to be disabled. 493 494 """ 495 self.visalib.disable_event(self.session, event_type, mechanism) 496 497 def discard_events( 498 self, event_type: constants.EventType, mechanism: constants.EventMechanism 499 ) -> None: 500 """Discards event occurrences for an event type and mechanism in this resource. 501 502 Parameters 503 ---------- 504 event_type : constants.EventType 505 Logical event identifier. 506 mechanism : constants.EventMechanism 507 Specifies event handling mechanisms to be disabled. 508 509 """ 510 self.visalib.discard_events(self.session, event_type, mechanism) 511 512 def enable_event( 513 self, 514 event_type: constants.EventType, 515 mechanism: constants.EventMechanism, 516 context: None = None, 517 ) -> None: 518 """Enable event occurrences for specified event types and mechanisms in this resource. 519 520 Parameters 521 ---------- 522 event_type : constants.EventType 523 Logical event identifier. 524 mechanism : constants.EventMechanism 525 Specifies event handling mechanisms to be enabled 526 context : None 527 Not currently used, leave as None. 528 529 """ 530 self.visalib.enable_event(self.session, event_type, mechanism, context) 531 532 def wait_on_event( 533 self, 534 in_event_type: constants.EventType, 535 timeout: int, 536 capture_timeout: bool = False, 537 ) -> WaitResponse: 538 """Waits for an occurrence of the specified event in this resource. 539 540 in_event_type : constants.EventType 541 Logical identifier of the event(s) to wait for. 542 timeout : int 543 Absolute time period in time units that the resource shall wait for 544 a specified event to occur before returning the time elapsed error. 545 The time unit is in milliseconds. None means waiting forever if 546 necessary. 547 capture_timeout : bool, optional 548 When True will not produce a VisaIOError(VI_ERROR_TMO) but instead 549 return a WaitResponse with timed_out=True. 550 551 Returns 552 ------- 553 WaitResponse 554 Object that contains event_type, context and ret value. 555 556 """ 557 try: 558 event_type, context, ret = self.visalib.wait_on_event( 559 self.session, in_event_type, timeout 560 ) 561 except errors.VisaIOError as exc: 562 if capture_timeout and exc.error_code == constants.StatusCode.error_timeout: 563 return WaitResponse( 564 in_event_type, 565 None, 566 constants.StatusCode.error_timeout, 567 self.visalib, 568 timed_out=True, 569 ) 570 raise 571 return WaitResponse(event_type, context, ret, self.visalib) 572 573 def lock( 574 self, 575 timeout: Union[float, Literal["default"]] = "default", 576 requested_key: Optional[str] = None, 577 ) -> str: 578 """Establish a shared lock to the resource. 579 580 Parameters 581 ---------- 582 timeout : Union[float, Literal["default"]], optional 583 Absolute time period (in milliseconds) that a resource waits to get 584 unlocked by the locking session before returning an error. 585 Defaults to "default" which means use self.timeout. 586 requested_key : Optional[str], optional 587 Access key used by another session with which you want your session 588 to share a lock or None to generate a new shared access key. 589 590 Returns 591 ------- 592 str 593 A new shared access key if requested_key is None, otherwise, same 594 value as the requested_key 595 596 """ 597 tout = cast(float, self.timeout if timeout == "default" else timeout) 598 clean_timeout = util.cleanup_timeout(tout) 599 return self.visalib.lock( 600 self.session, constants.Lock.shared, clean_timeout, requested_key 601 )[0] 602 603 def lock_excl(self, timeout: Union[float, Literal["default"]] = "default") -> None: 604 """Establish an exclusive lock to the resource. 605 606 Parameters 607 ---------- 608 timeout : Union[float, Literal["default"]], optional 609 Absolute time period (in milliseconds) that a resource waits to get 610 unlocked by the locking session before returning an error. 611 Defaults to "default" which means use self.timeout. 612 613 """ 614 tout = cast(float, self.timeout if timeout == "default" else timeout) 615 clean_timeout = util.cleanup_timeout(tout) 616 self.visalib.lock(self.session, constants.Lock.exclusive, clean_timeout, None) 617 618 def unlock(self) -> None: 619 """Relinquishes a lock for the specified resource.""" 620 self.visalib.unlock(self.session) 621 622 @contextlib.contextmanager 623 def lock_context( 624 self, 625 timeout: Union[float, Literal["default"]] = "default", 626 requested_key: Optional[str] = "exclusive", 627 ) -> Iterator[Optional[str]]: 628 """A context that locks 629 630 Parameters 631 ---------- 632 timeout : Union[float, Literal["default"]], optional 633 Absolute time period (in milliseconds) that a resource waits to get 634 unlocked by the locking session before returning an error. 635 Defaults to "default" which means use self.timeout. 636 requested_key : Optional[str], optional 637 When using default of 'exclusive' the lock is an exclusive lock. 638 Otherwise it is the access key for the shared lock or None to 639 generate a new shared access key. 640 641 Yields 642 ------ 643 Optional[str] 644 The access_key if applicable. 645 646 """ 647 if requested_key == "exclusive": 648 self.lock_excl(timeout) 649 access_key = None 650 else: 651 access_key = self.lock(timeout, requested_key) 652 try: 653 yield access_key 654 finally: 655 self.unlock() 656 657 658Resource.register(constants.InterfaceType.unknown, "")(Resource) 659