1# -*- coding: utf-8 -*- 2"""Common test case for all message based resources. 3 4""" 5import ctypes 6import gc 7import logging 8import time 9 10import pytest 11 12from pyvisa import constants, errors 13from pyvisa.constants import EventType, ResourceAttribute 14from pyvisa.resources import Resource 15 16from .resource_utils import ( 17 EventAwareResourceTestCaseMixin, 18 LockableResourceTestCaseMixin, 19 ResourceTestCase, 20) 21 22try: 23 import numpy as np # type: ignore 24except ImportError: 25 np = None 26 27 28class EventHandler: 29 """Event handler.""" 30 31 def __init__(self) -> None: 32 self.event_success = False 33 self.srq_success = False 34 self.io_completed = False 35 self.handle = None 36 self.session = None 37 38 def handle_event(self, session, event_type, event, handle=None): 39 """Event handler 40 41 Ctypes handler are expected to return an interger. 42 43 """ 44 self.session = session 45 self.handle = handle 46 if event_type == EventType.service_request: 47 self.event_success = True 48 self.srq_success = True 49 return 0 50 if event_type == EventType.io_completion: 51 self.event_success = True 52 self.io_completed = True 53 return 0 54 else: 55 self.event_success = True 56 return 0 57 58 def simplified_handler(self, resource, event, handle=None): 59 """Simplified handler that can be wrapped.""" 60 self.session = resource.session 61 self.handle = handle 62 event_type = event.event_type 63 if event_type == EventType.service_request: 64 self.event_success = True 65 self.srq_success = True 66 return None 67 elif event_type == EventType.io_completion: 68 self.event_success = True 69 self.io_completed = True 70 return None 71 else: 72 self.event_success = True 73 return None 74 75 76class MessagebasedResourceTestCase(ResourceTestCase): 77 """Base test case for all message based resources.""" 78 79 #: Type of resource being tested in this test case. 80 #: See RESOURCE_ADDRESSES in the __init__.py file of this package for 81 #: acceptable values 82 RESOURCE_TYPE = "" 83 84 # Any test involving communication involve to first write to glider the 85 # data then request it to send it back 86 87 def setup_method(self): 88 """Create a resource using the address matching the type.""" 89 super().setup_method() 90 self.instr.write_termination = "\n" 91 self.instr.read_termination = "\n" 92 self.instr.timeout = 100 93 94 def compare_user_handle(self, h1, h2): 95 """Function comparing to user handle as passed to a callback. 96 97 We need such an indirection because we cannot safely always return 98 a Python object and most ctypes object do not compare equal. 99 100 """ 101 if isinstance(h1, ctypes.Structure): 102 return h1 == h2 103 elif hasattr(h1, "value"): 104 return h1.value == h2.value 105 else: # assume an array 106 return all((i == j for i, j in zip(h1, h2))) 107 108 def test_encoding(self): 109 """Tets setting the string encoding.""" 110 assert self.instr.encoding == "ascii" 111 self.instr.encoding = "utf-8" 112 113 with pytest.raises(LookupError): 114 self.instr.encoding = "test" 115 116 def test_termchars(self): 117 """Test modifying the termchars.""" 118 # Write termination 119 self.instr.write_termination = "\r\n" 120 assert self.instr.write_termination == "\r\n" 121 122 self.instr.read_termination = "\r\0" 123 assert self.instr.read_termination == "\r\0" 124 assert self.instr.get_visa_attribute(ResourceAttribute.termchar) == ord("\0") 125 assert self.instr.get_visa_attribute(ResourceAttribute.termchar_enabled) 126 127 # Disable read termination 128 self.instr.read_termination = None 129 assert self.instr.get_visa_attribute(ResourceAttribute.termchar) == ord("\n") 130 assert not self.instr.get_visa_attribute(ResourceAttribute.termchar_enabled) 131 132 # Ban repeated term chars 133 with pytest.raises(ValueError): 134 self.instr.read_termination = "\n\n" 135 136 def test_write_raw_read_bytes(self): 137 """Test writing raw data and reading a specific number of bytes.""" 138 # Reading all bytes at once 139 self.instr.write_raw(b"RECEIVE\n") 140 self.instr.write_raw(b"test\n") 141 count = self.instr.write_raw(b"SEND\n") 142 assert count == 5 143 self.instr.flush(constants.VI_READ_BUF) 144 msg = self.instr.read_bytes(5, chunk_size=2) 145 assert msg == b"test\n" 146 147 # Reading one byte at a time 148 self.instr.write_raw(b"RECEIVE\n") 149 self.instr.write_raw(b"test\n") 150 self.instr.write_raw(b"SEND\n") 151 for ch in b"test\n": 152 assert self.instr.read_bytes(1) == ch.to_bytes(1, "little") 153 154 # Breaking on termchar 155 self.instr.read_termination = "\r" 156 self.instr.write_raw(b"RECEIVE\n") 157 self.instr.write_raw(b"te\rst\r\n") 158 self.instr.write_raw(b"SEND\n") 159 assert self.instr.read_bytes(100, break_on_termchar=True) == b"te\r" 160 assert self.instr.read_bytes(100, break_on_termchar=True) == b"st\r" 161 assert self.instr.read_bytes(1) == b"\n" 162 163 # Breaking on end of message 164 self.instr.read_termination = "\n" 165 self.instr.write_raw(b"RECEIVE\n") 166 self.instr.write_raw(b"test\n") 167 self.instr.write_raw(b"SEND\n") 168 assert self.instr.read_bytes(100, break_on_termchar=True) == b"test\n" 169 170 def test_handling_exception_in_read_bytes(self, caplog): 171 """Test handling exception in read_bytes (monkeypatching)""" 172 173 def false_read(session, size): 174 raise errors.VisaIOError(constants.VI_ERROR_ABORT) 175 176 read = self.instr.visalib.read 177 self.instr.visalib.read = false_read 178 with caplog.at_level(logging.DEBUG): 179 try: 180 self.instr.read_bytes(1) 181 except errors.VisaIOError: 182 pass 183 finally: 184 self.instr.visalib.read = read 185 assert "- exception while reading:" in caplog.records[1].message 186 187 def test_write_raw_read_raw(self): 188 """Test writing raw data and reading an answer.""" 189 self.instr.write_raw(b"RECEIVE\n") 190 self.instr.write_raw(b"test\n") 191 self.instr.write_raw(b"SEND\n") 192 assert self.instr.read_raw(size=2) == b"test\n" 193 194 def test_clear(self): 195 """Test clearing the incoming buffer.""" 196 self.instr.write_raw(b"RECEIVE\n") 197 self.instr.write_raw(b"test\n") 198 self.instr.write_raw(b"SEND\n") 199 self.instr.clear() 200 self.instr.timeout = 10 201 with pytest.raises(errors.VisaIOError): 202 self.instr.read_raw() 203 204 def test_write_read(self): 205 """Test writing and reading.""" 206 self.instr.write_termination = "\n" 207 self.instr.read_termination = "\r\n" 208 self.instr.write("RECEIVE") 209 with pytest.warns(UserWarning): 210 self.instr.write("test\r\n") 211 count = self.instr.write("SEND") 212 assert count == 5 213 assert self.instr.read() == "test" 214 215 # Missing termination chars 216 self.instr.read_termination = "\r\n" 217 self.instr.write("RECEIVE") 218 self.instr.write("test") 219 self.instr.write("SEND") 220 with pytest.warns(Warning): 221 assert self.instr.read() == "test\n" 222 223 # Dynamic termination 224 self.instr.write_termination = "\r" 225 self.instr.write("RECEIVE\n", termination=False) 226 self.instr.write("test\r", termination="\n") 227 self.instr.write("SEND", termination="\n") 228 assert self.instr.read(termination="\r") == "test" 229 230 # Test query 231 self.instr.write_termination = "\n" 232 self.instr.write("RECEIVE") 233 self.instr.write("test\r") 234 tic = time.time() 235 assert self.instr.query("SEND", delay=0.5) == "test" 236 assert time.time() - tic > 0.49 237 238 # Test handling repeated term char 239 self.instr.read_termination = "\n" 240 for char in ("\r", None): 241 self.instr.write_termination = "\n" if char else "\r" 242 self.instr.write("RECEIVE", termination="\n") 243 with pytest.warns(Warning): 244 self.instr.write("test\r", termination=char) 245 self.instr.write("", termination="\n") 246 self.instr.write("SEND", termination="\n") 247 assert self.instr.read() == "test\r\r" 248 249 # TODO not sure how to test encoding 250 251 def test_handling_exception_in_read_raw(self, caplog): 252 """Test handling exception in read_bytes (monkeypatching)""" 253 254 def false_read(session, size): 255 raise errors.VisaIOError(constants.VI_ERROR_ABORT) 256 257 read = self.instr.visalib.read 258 self.instr.visalib.read = false_read 259 with caplog.at_level(logging.DEBUG): 260 try: 261 self.instr.read() 262 except errors.VisaIOError: 263 pass 264 finally: 265 self.instr.visalib.read = read 266 267 assert caplog.records 268 269 def test_write_ascii_values(self): 270 """Test writing ascii values.""" 271 # Standard separator 272 values = [1, 2, 3, 4, 5] 273 self.instr.write("RECEIVE") 274 count = self.instr.write_ascii_values("", values, "d") 275 assert count == 10 276 self.instr.write("SEND") 277 assert self.instr.read() == "1,2,3,4,5" 278 279 # Non standard separator and termination 280 self.instr.write_termination = "\r" 281 self.instr.write("RECEIVE", termination="\n") 282 self.instr.write_ascii_values("", values, "d", separator=";", termination=False) 283 self.instr.write("", termination="\n") 284 self.instr.write("SEND", termination="\n") 285 assert self.instr.read() == "1;2;3;4;5" 286 287 # Test handling repeated term char 288 for char in ("\r", None): 289 self.instr.write_termination = "\n" if char else "\r" 290 self.instr.write("RECEIVE", termination="\n") 291 with pytest.warns(Warning): 292 values = [1, 2, 3, 4, 5] 293 self.instr.write_ascii_values( 294 "\r", values, "s", separator=";", termination=char 295 ) 296 self.instr.write("", termination="\n") 297 self.instr.write("SEND", termination="\n") 298 assert self.instr.read() == "\r1;2;3;4;5\r" 299 300 @pytest.mark.parametrize( 301 "hfmt, prefix", zip(("ieee", "hp", "empty"), (b"#212", b"#A\x0c\x00", b"")) 302 ) 303 def test_write_binary_values(self, hfmt, prefix): 304 """Test writing binary data.""" 305 values = [1, 2, 3, 4, 5, 6] 306 self.instr.write_termination = "\n" 307 self.instr.write("RECEIVE") 308 count = self.instr.write_binary_values("", values, "h", header_fmt=hfmt) 309 # Each interger encoded as h uses 2 bytes 310 assert count == len(prefix) + 12 + 1 311 self.instr.write("SEND") 312 msg = self.instr.read_bytes(13 + len(prefix)) 313 assert msg == prefix + b"\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00\x06\x00\n" 314 315 if hfmt == "hp": 316 fl_prefix = prefix[0:2] + prefix[-2::][::-1] 317 else: 318 fl_prefix = prefix 319 self.instr.write_termination = "\r" 320 self.instr.write("RECEIVE", termination="\n") 321 self.instr.write_binary_values( 322 "", values, "h", is_big_endian=True, termination=False, header_fmt=hfmt 323 ) 324 self.instr.write("", termination="\n") 325 self.instr.write("SEND", termination="\n") 326 assert ( 327 self.instr.read_bytes(13 + len(prefix)) 328 == fl_prefix + b"\x00\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00\x06\n" 329 ) 330 331 # Test handling repeated term char 332 for char in ("\r", None): 333 self.instr.write_termination = "\n" if char else "\r" 334 self.instr.write("RECEIVE", termination="\n") 335 with pytest.warns(Warning): 336 self.instr.write_binary_values( 337 "\r", values, "h", header_fmt=hfmt, termination=char 338 ) 339 self.instr.write("", termination="\n") 340 self.instr.write("SEND", termination="\n") 341 msg = self.instr.read() 342 assert ( 343 msg 344 == "\r" 345 + prefix.decode("ascii") 346 + "\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00\x06\x00\r" 347 ) 348 349 # Wrong header format 350 with pytest.raises(ValueError): 351 self.instr.write_binary_values("", values, "h", header_fmt="zxz") 352 353 def test_read_ascii_values(self): 354 """Test reading ascii values.""" 355 # Standard separator 356 self.instr.write("RECEIVE") 357 self.instr.write("1,2,3,4,5") 358 self.instr.write("SEND") 359 values = self.instr.read_ascii_values() 360 assert type(values[0]) is float 361 assert values == [1.0, 2.0, 3.0, 4.0, 5.0] 362 363 # Non standard separator and termination 364 self.instr.write("RECEIVE") 365 self.instr.write("1;2;3;4;5") 366 tic = time.time() 367 values = self.instr.query_ascii_values( 368 "SEND", converter="d", separator=";", delay=0.5 369 ) 370 assert time.time() - tic > 0.5 371 assert type(values[0]) is int 372 assert values == [1, 2, 3, 4, 5] 373 374 # Numpy container 375 if np: 376 self.instr.write("RECEIVE") 377 self.instr.write("1,2,3,4,5") 378 self.instr.write("SEND") 379 values = self.instr.read_ascii_values(container=np.array) 380 expected = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) 381 assert values.dtype is expected.dtype 382 np.testing.assert_array_equal(values, expected) 383 384 @pytest.mark.parametrize("hfmt", ("ieee", "hp")) 385 def test_read_binary_values(self, hfmt): 386 """Test reading binary data.""" 387 # TODO test handling binary decoding issue (troublesome) 388 self.instr.read_termination = "\r" 389 # 3328 in binary short is \x00\r this way we can interrupt the 390 # transmission midway to test some corner cases 391 data = [1, 2, 3328, 3, 4, 5, 6, 7] 392 393 self.instr.write("RECEIVE") 394 self.instr.write_binary_values( 395 "", data, "h", header_fmt=hfmt, termination="\r\n" 396 ) 397 self.instr.write("SEND") 398 new = self.instr.read_binary_values( 399 datatype="h", 400 is_big_endian=False, 401 header_fmt=hfmt, 402 expect_termination=True, 403 chunk_size=8, 404 ) 405 self.instr.read_bytes(1) 406 assert data == new 407 408 self.instr.write("RECEIVE") 409 self.instr.write_binary_values( 410 "", data, "h", header_fmt=hfmt, is_big_endian=True 411 ) 412 413 new = self.instr.query_binary_values( 414 "SEND", 415 datatype="h", 416 header_fmt=hfmt, 417 is_big_endian=True, 418 expect_termination=False, 419 chunk_size=8, 420 container=np.array if np else list, 421 ) 422 self.instr.read_bytes(1) 423 if np: 424 np.testing.assert_array_equal(new, np.array(data, dtype=np.int16)) 425 else: 426 assert data == new 427 428 def test_read_query_binary_values_invalid_header(self): 429 """Test we properly handle an invalid header.""" 430 data = [1, 2, 3328, 3, 4, 5, 6, 7] 431 self.instr.write("RECEIVE") 432 self.instr.write_binary_values( 433 "", data, "h", header_fmt="ieee", is_big_endian=True 434 ) 435 self.instr.write("SEND") 436 with pytest.raises(ValueError): 437 self.instr.read_binary_values( 438 datatype="h", 439 is_big_endian=False, 440 header_fmt="invalid", 441 expect_termination=True, 442 chunk_size=8, 443 ) 444 445 self.instr.write("RECEIVE") 446 self.instr.write_binary_values( 447 "", data, "h", header_fmt="ieee", is_big_endian=True 448 ) 449 with pytest.raises(ValueError): 450 self.instr.query_binary_values( 451 "*IDN", 452 datatype="h", 453 is_big_endian=False, 454 header_fmt="invalid", 455 expect_termination=True, 456 chunk_size=8, 457 ) 458 459 # Not sure how to test this 460 @pytest.mark.skip 461 def test_handling_malformed_binary(self): 462 """""" 463 pass 464 465 @pytest.mark.parametrize( 466 "hfmt, header", zip(("ieee", "hp", "empty"), ("#10", "#A\x00\x00", "")) 467 ) 468 def test_read_binary_values_unreported_length(self, hfmt, header): 469 """Test reading binary data.""" 470 self.instr.read_termination = "\r" 471 # 3328 in binary short is \x00\r this way we can interrupt the 472 # transmission midway to test some corner cases 473 data = [1, 2, 3328, 3, 4, 5] 474 475 self.instr.write("RECEIVE") 476 self.instr.write( 477 header + "\x01\x00\x02\x00\x00\r\x03\x00\x04\x00\x05\x00", 478 termination="\r\n", 479 ) 480 self.instr.write("SEND") 481 new = self.instr.read_binary_values( 482 datatype="h", 483 is_big_endian=False, 484 header_fmt=hfmt, 485 expect_termination=True, 486 chunk_size=6, 487 data_points=6, 488 ) 489 self.instr.read_bytes(1) 490 assert data == new 491 492 self.instr.write("RECEIVE") 493 self.instr.write( 494 header + "\x00\x01\x00\x02\r\x00\x00\x03\x00\x04\x00\x05", 495 termination="\r\n", 496 ) 497 new = self.instr.query_binary_values( 498 "SEND", 499 datatype="h", 500 header_fmt=hfmt, 501 is_big_endian=True, 502 expect_termination=False, 503 chunk_size=6, 504 container=np.array if np else list, 505 data_points=6, 506 ) 507 self.instr.read_bytes(1) 508 if np: 509 np.testing.assert_array_equal(new, np.array(data, dtype=np.int16)) 510 else: 511 assert data == new 512 513 # Check we do error on unreported/unspecified length 514 self.instr.write("RECEIVE") 515 self.instr.write( 516 header + "\x01\x00\x02\x00\x00\r\x03\x00\x04\x00\x05\x00", 517 termination="\r\n", 518 ) 519 self.instr.write("SEND") 520 with pytest.raises(ValueError): 521 self.instr.read_binary_values( 522 datatype="h", 523 is_big_endian=False, 524 header_fmt=hfmt, 525 expect_termination=True, 526 chunk_size=6, 527 ) 528 529 def test_delay_in_query_ascii(self): 530 """Test handling of the delay argument in query_ascii_values.""" 531 # Test using the instrument wide delay 532 self.instr.query_delay = 1.0 533 self.instr.write("RECEIVE") 534 self.instr.write("1,2,3,4,5") 535 tic = time.perf_counter() 536 values = self.instr.query_ascii_values("SEND") 537 assert time.perf_counter() - tic > 0.99 538 assert type(values[0]) is float 539 assert values == [1.0, 2.0, 3.0, 4.0, 5.0] 540 541 # Test specifying the delay 542 self.instr.query_delay = 0.0 543 self.instr.write("RECEIVE") 544 self.instr.write("1,2,3,4,5") 545 tic = time.perf_counter() 546 values = self.instr.query_ascii_values("SEND", delay=1.0) 547 assert time.perf_counter() - tic > 0.99 548 assert type(values[0]) is float 549 assert values == [1.0, 2.0, 3.0, 4.0, 5.0] 550 551 # Test specifying a 0 delay 552 self.instr.query_delay = 1.0 553 self.instr.write("RECEIVE") 554 self.instr.write("1,2,3,4,5") 555 tic = time.perf_counter() 556 values = self.instr.query_ascii_values("SEND", delay=0.0) 557 assert time.perf_counter() - tic < 0.99 558 assert type(values[0]) is float 559 assert values == [1.0, 2.0, 3.0, 4.0, 5.0] 560 561 def test_instrument_wide_delay_in_query_binary(self): 562 """Test handling delay in query_ascii_values.""" 563 header = "#10" 564 data = [1, 2, 3328, 3, 4, 5] 565 # Test using the instrument wide delay 566 self.instr.query_delay = 1.0 567 self.instr.write("RECEIVE") 568 self.instr.write( 569 header + "\x00\x01\x00\x02\r\x00\x00\x03\x00\x04\x00\x05", 570 termination="\r\n", 571 ) 572 tic = time.perf_counter() 573 new = self.instr.query_binary_values( 574 "SEND", 575 datatype="h", 576 header_fmt="ieee", 577 is_big_endian=True, 578 expect_termination=False, 579 chunk_size=6, 580 data_points=6, 581 ) 582 583 assert time.perf_counter() - tic > 0.99 584 assert data == new 585 586 def test_delay_args_in_query_binary(self): 587 """Test handling of the delay argument in query_ascii_values.""" 588 header = "#10" 589 data = [1, 2, 3328, 3, 4, 5] 590 self.instr.query_delay = 0.0 591 self.instr.write("RECEIVE") 592 self.instr.write( 593 header + "\x00\x01\x00\x02\r\x00\x00\x03\x00\x04\x00\x05", 594 termination="\r\n", 595 ) 596 tic = time.perf_counter() 597 new = self.instr.query_binary_values( 598 "SEND", 599 datatype="h", 600 header_fmt="ieee", 601 is_big_endian=True, 602 expect_termination=False, 603 chunk_size=6, 604 data_points=6, 605 delay=1.0, 606 ) 607 608 assert time.perf_counter() - tic > 0.99 609 assert data == new 610 611 def test_no_delay_args_in_query_binary(self): 612 """Test handling of the delay argument in query_ascii_values.""" 613 header = "#10" 614 data = [1, 2, 3328, 3, 4, 5] 615 self.instr.query_delay = 1.0 616 self.instr.write("RECEIVE") 617 self.instr.write( 618 header + "\x00\x01\x00\x02\r\x00\x00\x03\x00\x04\x00\x05", 619 termination="\r\n", 620 ) 621 tic = time.perf_counter() 622 new = self.instr.query_binary_values( 623 "SEND", 624 datatype="h", 625 header_fmt="ieee", 626 is_big_endian=True, 627 expect_termination=False, 628 chunk_size=6, 629 data_points=6, 630 delay=0.0, 631 ) 632 633 assert time.perf_counter() - tic < 1.0 634 assert data == new 635 636 def test_stb(self): 637 """Test reading the status byte.""" 638 assert 0 <= self.instr.stb <= 256 639 assert 0 <= self.instr.read_stb() <= 256 640 641 642class EventAwareMessagebasedResourceTestCaseMixin(EventAwareResourceTestCaseMixin): 643 """Mixin for message based resources supporting events.""" 644 645 def test_manually_called_handlers(self): 646 """Test calling manually even handler.""" 647 648 class FalseResource(Resource): 649 session = None 650 visalib = None 651 _session = None 652 653 def __init__(self): 654 pass 655 656 fres = FalseResource() 657 fres2 = FalseResource() 658 fres2.session = 1 659 660 handler = EventHandler() 661 false_wrapped_handler = fres.wrap_handler(handler.simplified_handler) 662 false_wrapped_handler(None, EventType.clear, 1, 1) 663 assert handler.event_success 664 665 with pytest.raises(RuntimeError): 666 false_wrapped_handler(1, EventType.clear, 1, 1) 667 668 def test_handling_invalid_handler(self): 669 """Test handling an error related to a wrong handler type.""" 670 with pytest.raises(errors.VisaTypeError): 671 event_type = EventType.exception 672 self.instr.install_handler(event_type, 1, object()) 673 674 def test_uninstalling_missing_visa_handler(self): 675 """Test uninstalling a visa handler that was not registered.""" 676 handler1 = EventHandler() 677 handler2 = EventHandler() 678 event_type = EventType.exception 679 self.instr.install_handler(event_type, handler1.handle_event) 680 with pytest.raises(errors.UnknownHandler): 681 self.instr.uninstall_handler(event_type, handler2.handle_event) 682 683 self.instr.uninstall_handler(event_type, handler1.handle_event) 684 685 with pytest.raises(errors.UnknownHandler): 686 self.instr.uninstall_handler(event_type, handler2.handle_event) 687 688 def test_handler_clean_up_on_resource_del(self): 689 """Test that handlers are properly cleaned when a resource is deleted.""" 690 handler = EventHandler() 691 event_type = EventType.exception 692 self.instr.install_handler(event_type, handler.handle_event) 693 694 self.instr = None 695 gc.collect() 696 assert not self.rm.visalib.handlers 697 698 def test_uninstall_all_handlers(self): 699 """Test uninstall all handlers from all sessions.""" 700 handler = EventHandler() 701 event_type = EventType.exception 702 self.instr.install_handler(event_type, handler.handle_event) 703 704 self.rm.visalib.uninstall_all_visa_handlers(None) 705 assert not self.rm.visalib.handlers 706 707 def test_manual_async_read(self): 708 """Test handling IOCompletion event which has extra attributes.""" 709 # Prepare message 710 self.instr.write_raw(b"RECEIVE\n") 711 self.instr.write_raw(b"test\n") 712 self.instr.write_raw(b"SEND\n") 713 714 # Enable event handling 715 event_type = EventType.io_completion 716 event_mech = constants.EventMechanism.queue 717 wait_time = 2000 # set time that program waits to receive event 718 self.instr.enable_event(event_type, event_mech, None) 719 720 try: 721 visalib = self.instr.visalib 722 buffer, job_id, status_code = visalib.read_asynchronously( 723 self.instr.session, 10 724 ) 725 assert buffer is visalib.get_buffer_from_id(job_id) 726 response = self.instr.wait_on_event(event_type, wait_time) 727 finally: 728 self.instr.disable_event(event_type, event_mech) 729 730 assert response.event.status == constants.StatusCode.success 731 assert bytes(buffer) == bytes(response.event.buffer) 732 assert bytes(response.event.data) == b"test\n" 733 assert response.event.return_count == 5 734 assert response.event.operation_name == "viReadAsync" 735 736 def test_getting_unknown_buffer(self): 737 """Test getting a buffer with a wrong ID.""" 738 assert self.instr.visalib.get_buffer_from_id(1) is None 739 740 def test_wait_on_event_timeout(self): 741 """Test waiting on a VISA event.""" 742 event_type = EventType.service_request 743 event_mech = constants.EventMechanism.queue 744 # Emit a clear to avoid dealing with previous requests 745 self.instr.clear() 746 self.instr.enable_event(event_type, event_mech, None) 747 try: 748 response = self.instr.wait_on_event(event_type, 10, capture_timeout=True) 749 finally: 750 self.instr.disable_event(event_type, event_mech) 751 assert response.timed_out 752 assert response.event.event_type == event_type 753 754 with pytest.raises(errors.VisaIOError): 755 self.instr.enable_event(event_type, event_mech, None) 756 try: 757 response = self.instr.wait_on_event(event_type, 10) 758 finally: 759 self.instr.disable_event(event_type, event_mech) 760 761 def test_wait_on_event(self): 762 """Test waiting on a VISA event.""" 763 event_type = EventType.service_request 764 event_mech = constants.EventMechanism.queue 765 wait_time = 2000 # set time that program waits to receive event 766 self.instr.enable_event(event_type, event_mech, None) 767 self.instr.write("RCVSLOWSRQ") 768 self.instr.write("1") 769 self.instr.write("SENDSLOWSRQ") 770 try: 771 response = self.instr.wait_on_event(event_type, wait_time) 772 finally: 773 self.instr.disable_event(event_type, event_mech) 774 775 assert not response.timed_out 776 assert response.event.event_type == EventType.service_request 777 assert self.instr.read() == "1" 778 779 with pytest.warns(FutureWarning): 780 response.event_type 781 with pytest.warns(FutureWarning): 782 response.context 783 784 def test_managing_visa_handler(self): 785 """Test using visa handlers.""" 786 787 def _test(handle): 788 handler = EventHandler() 789 event_type = EventType.service_request 790 event_mech = constants.EventMechanism.handler 791 user_handle = self.instr.install_handler( 792 event_type, handler.handle_event, user_handle=handle 793 ) 794 self.instr.enable_event(event_type, event_mech, None) 795 self.instr.write("RCVSLOWSRQ") 796 self.instr.write("1") 797 self.instr.write("SENDSLOWSRQ") 798 799 try: 800 t1 = time.time() 801 while not handler.event_success: 802 if (time.time() - t1) > 2: 803 break 804 time.sleep(0.1) 805 finally: 806 self.instr.disable_event(event_type, event_mech) 807 self.instr.uninstall_handler( 808 event_type, handler.handle_event, user_handle 809 ) 810 811 assert handler.session == self.instr.session 812 assert self.compare_user_handle(handler.handle, user_handle) 813 assert handler.srq_success 814 assert self.instr.read() == "1" 815 self.instr.clear() 816 817 class Point(ctypes.Structure): 818 _fields_ = [("x", ctypes.c_int), ("y", ctypes.c_int)] 819 820 def __eq__(self, other): 821 if type(self) is not type(other): 822 return False 823 return self.x == other.x and self.y == other.y 824 825 for handle in (1, 1.0, "1", [1], [1.0], Point(1, 2)): 826 print(handle) 827 _test(handle) 828 829 def test_wrapping_handler(self): 830 """Test wrapping a handler using a Resource.""" 831 832 handler = EventHandler() 833 event_type = EventType.service_request 834 event_mech = constants.EventMechanism.handler 835 wrapped_handler = self.instr.wrap_handler(handler.simplified_handler) 836 user_handle = self.instr.install_handler(event_type, wrapped_handler, 1) 837 self.instr.enable_event(event_type, event_mech, None) 838 self.instr.write("RCVSLOWSRQ") 839 self.instr.write("1") 840 self.instr.write("SENDSLOWSRQ") 841 842 try: 843 t1 = time.time() 844 while not handler.event_success: 845 if (time.time() - t1) > 2: 846 break 847 time.sleep(0.1) 848 finally: 849 self.instr.disable_event(event_type, event_mech) 850 self.instr.uninstall_handler(event_type, wrapped_handler, user_handle) 851 852 assert self.instr.session == handler.session 853 assert self.compare_user_handle(handler.handle, user_handle) 854 assert handler.srq_success 855 assert self.instr.read() == "1" 856 857 def test_bare_handler(self): 858 """Test using a bare handler passing raw backend values.""" 859 from pyvisa import ctwrapper 860 861 if not isinstance(self.instr.visalib, ctwrapper.IVIVisaLibrary): 862 return 863 864 ctwrapper.WRAP_HANDLER = False 865 try: 866 handler = EventHandler() 867 event_type = EventType.service_request 868 event_mech = constants.EventMechanism.handler 869 user_handle = self.instr.install_handler( 870 event_type, handler.handle_event, 1 871 ) 872 self.instr.enable_event(event_type, event_mech, None) 873 self.instr.write("RCVSLOWSRQ") 874 self.instr.write("1") 875 self.instr.write("SENDSLOWSRQ") 876 877 try: 878 t1 = time.time() 879 while not handler.event_success: 880 if (time.time() - t1) > 2: 881 break 882 time.sleep(0.1) 883 finally: 884 self.instr.disable_event(event_type, event_mech) 885 self.instr.uninstall_handler( 886 event_type, handler.handle_event, user_handle 887 ) 888 889 assert self.instr.session == handler.session.value 890 assert self.compare_user_handle(handler.handle.contents, user_handle) 891 assert handler.srq_success 892 assert self.instr.read() == "1" 893 finally: 894 ctwrapper.WRAP_HANDLER = True 895 896 897class LockableMessagedBasedResourceTestCaseMixin(LockableResourceTestCaseMixin): 898 """Mixing for message based resources supporting locking.""" 899 900 def test_shared_locking(self): 901 """Test locking/unlocking a resource.""" 902 instr2 = self.rm.open_resource(str(self.rname)) 903 instr3 = self.rm.open_resource(str(self.rname)) 904 905 key = self.instr.lock() 906 instr2.lock(requested_key=key) 907 908 assert self.instr.query("*IDN?") 909 assert instr2.query("*IDN?") 910 with pytest.raises(errors.VisaIOError): 911 instr3.query("*IDN?") 912 913 # Share the lock for a limited time 914 with instr3.lock_context(requested_key=key) as key2: 915 assert instr3.query("*IDN?") 916 assert key == key2 917 918 # Stop sharing the lock 919 instr2.unlock() 920 921 with pytest.raises(errors.VisaIOError): 922 instr2.query("*IDN?") 923 with pytest.raises(errors.VisaIOError): 924 instr3.query("*IDN?") 925 926 self.instr.unlock() 927 928 assert instr3.query("*IDN?") 929 930 def test_exclusive_locking(self): 931 """Test locking/unlocking a resource.""" 932 instr2 = self.rm.open_resource(str(self.rname)) 933 934 self.instr.lock_excl() 935 with pytest.raises(errors.VisaIOError): 936 instr2.query("*IDN?") 937 938 self.instr.unlock() 939 940 assert instr2.query("*IDN?") 941 942 # Share the lock for a limited time 943 with self.instr.lock_context(requested_key="exclusive") as key: 944 assert key is None 945 with pytest.raises(errors.VisaIOError): 946 instr2.query("*IDN?") 947