1# -*- coding: utf-8 -*-
2"""Common test case for all message based resources.
3
4"""
5import ctypes
6import gc
7import logging
8import time
9
10import pytest
11
12from pyvisa import constants, errors
13from pyvisa.constants import EventType, ResourceAttribute
14from pyvisa.resources import Resource
15
16from .resource_utils import (
17    EventAwareResourceTestCaseMixin,
18    LockableResourceTestCaseMixin,
19    ResourceTestCase,
20)
21
22try:
23    import numpy as np  # type: ignore
24except ImportError:
25    np = None
26
27
28class EventHandler:
29    """Event handler."""
30
31    def __init__(self) -> None:
32        self.event_success = False
33        self.srq_success = False
34        self.io_completed = False
35        self.handle = None
36        self.session = None
37
38    def handle_event(self, session, event_type, event, handle=None):
39        """Event handler
40
41        Ctypes handler are expected to return an interger.
42
43        """
44        self.session = session
45        self.handle = handle
46        if event_type == EventType.service_request:
47            self.event_success = True
48            self.srq_success = True
49            return 0
50        if event_type == EventType.io_completion:
51            self.event_success = True
52            self.io_completed = True
53            return 0
54        else:
55            self.event_success = True
56            return 0
57
58    def simplified_handler(self, resource, event, handle=None):
59        """Simplified handler that can be wrapped."""
60        self.session = resource.session
61        self.handle = handle
62        event_type = event.event_type
63        if event_type == EventType.service_request:
64            self.event_success = True
65            self.srq_success = True
66            return None
67        elif event_type == EventType.io_completion:
68            self.event_success = True
69            self.io_completed = True
70            return None
71        else:
72            self.event_success = True
73            return None
74
75
76class MessagebasedResourceTestCase(ResourceTestCase):
77    """Base test case for all message based resources."""
78
79    #: Type of resource being tested in this test case.
80    #: See RESOURCE_ADDRESSES in the __init__.py file of this package for
81    #: acceptable values
82    RESOURCE_TYPE = ""
83
84    # Any test involving communication involve to first write to glider the
85    # data then request it to send it back
86
87    def setup_method(self):
88        """Create a resource using the address matching the type."""
89        super().setup_method()
90        self.instr.write_termination = "\n"
91        self.instr.read_termination = "\n"
92        self.instr.timeout = 100
93
94    def compare_user_handle(self, h1, h2):
95        """Function comparing to user handle as passed to a callback.
96
97        We need such an indirection because we cannot safely always return
98        a Python object and most ctypes object do not compare equal.
99
100        """
101        if isinstance(h1, ctypes.Structure):
102            return h1 == h2
103        elif hasattr(h1, "value"):
104            return h1.value == h2.value
105        else:  # assume an array
106            return all((i == j for i, j in zip(h1, h2)))
107
108    def test_encoding(self):
109        """Tets setting the string encoding."""
110        assert self.instr.encoding == "ascii"
111        self.instr.encoding = "utf-8"
112
113        with pytest.raises(LookupError):
114            self.instr.encoding = "test"
115
116    def test_termchars(self):
117        """Test modifying the termchars."""
118        # Write termination
119        self.instr.write_termination = "\r\n"
120        assert self.instr.write_termination == "\r\n"
121
122        self.instr.read_termination = "\r\0"
123        assert self.instr.read_termination == "\r\0"
124        assert self.instr.get_visa_attribute(ResourceAttribute.termchar) == ord("\0")
125        assert self.instr.get_visa_attribute(ResourceAttribute.termchar_enabled)
126
127        # Disable read termination
128        self.instr.read_termination = None
129        assert self.instr.get_visa_attribute(ResourceAttribute.termchar) == ord("\n")
130        assert not self.instr.get_visa_attribute(ResourceAttribute.termchar_enabled)
131
132        # Ban repeated term chars
133        with pytest.raises(ValueError):
134            self.instr.read_termination = "\n\n"
135
136    def test_write_raw_read_bytes(self):
137        """Test writing raw data and reading a specific number of bytes."""
138        # Reading all bytes at once
139        self.instr.write_raw(b"RECEIVE\n")
140        self.instr.write_raw(b"test\n")
141        count = self.instr.write_raw(b"SEND\n")
142        assert count == 5
143        self.instr.flush(constants.VI_READ_BUF)
144        msg = self.instr.read_bytes(5, chunk_size=2)
145        assert msg == b"test\n"
146
147        # Reading one byte at a time
148        self.instr.write_raw(b"RECEIVE\n")
149        self.instr.write_raw(b"test\n")
150        self.instr.write_raw(b"SEND\n")
151        for ch in b"test\n":
152            assert self.instr.read_bytes(1) == ch.to_bytes(1, "little")
153
154        # Breaking on termchar
155        self.instr.read_termination = "\r"
156        self.instr.write_raw(b"RECEIVE\n")
157        self.instr.write_raw(b"te\rst\r\n")
158        self.instr.write_raw(b"SEND\n")
159        assert self.instr.read_bytes(100, break_on_termchar=True) == b"te\r"
160        assert self.instr.read_bytes(100, break_on_termchar=True) == b"st\r"
161        assert self.instr.read_bytes(1) == b"\n"
162
163        # Breaking on end of message
164        self.instr.read_termination = "\n"
165        self.instr.write_raw(b"RECEIVE\n")
166        self.instr.write_raw(b"test\n")
167        self.instr.write_raw(b"SEND\n")
168        assert self.instr.read_bytes(100, break_on_termchar=True) == b"test\n"
169
170    def test_handling_exception_in_read_bytes(self, caplog):
171        """Test handling exception in read_bytes (monkeypatching)"""
172
173        def false_read(session, size):
174            raise errors.VisaIOError(constants.VI_ERROR_ABORT)
175
176        read = self.instr.visalib.read
177        self.instr.visalib.read = false_read
178        with caplog.at_level(logging.DEBUG):
179            try:
180                self.instr.read_bytes(1)
181            except errors.VisaIOError:
182                pass
183            finally:
184                self.instr.visalib.read = read
185        assert "- exception while reading:" in caplog.records[1].message
186
187    def test_write_raw_read_raw(self):
188        """Test writing raw data and reading an answer."""
189        self.instr.write_raw(b"RECEIVE\n")
190        self.instr.write_raw(b"test\n")
191        self.instr.write_raw(b"SEND\n")
192        assert self.instr.read_raw(size=2) == b"test\n"
193
194    def test_clear(self):
195        """Test clearing the incoming buffer."""
196        self.instr.write_raw(b"RECEIVE\n")
197        self.instr.write_raw(b"test\n")
198        self.instr.write_raw(b"SEND\n")
199        self.instr.clear()
200        self.instr.timeout = 10
201        with pytest.raises(errors.VisaIOError):
202            self.instr.read_raw()
203
204    def test_write_read(self):
205        """Test writing and reading."""
206        self.instr.write_termination = "\n"
207        self.instr.read_termination = "\r\n"
208        self.instr.write("RECEIVE")
209        with pytest.warns(UserWarning):
210            self.instr.write("test\r\n")
211        count = self.instr.write("SEND")
212        assert count == 5
213        assert self.instr.read() == "test"
214
215        # Missing termination chars
216        self.instr.read_termination = "\r\n"
217        self.instr.write("RECEIVE")
218        self.instr.write("test")
219        self.instr.write("SEND")
220        with pytest.warns(Warning):
221            assert self.instr.read() == "test\n"
222
223        # Dynamic termination
224        self.instr.write_termination = "\r"
225        self.instr.write("RECEIVE\n", termination=False)
226        self.instr.write("test\r", termination="\n")
227        self.instr.write("SEND", termination="\n")
228        assert self.instr.read(termination="\r") == "test"
229
230        # Test query
231        self.instr.write_termination = "\n"
232        self.instr.write("RECEIVE")
233        self.instr.write("test\r")
234        tic = time.time()
235        assert self.instr.query("SEND", delay=0.5) == "test"
236        assert time.time() - tic > 0.49
237
238        # Test handling repeated term char
239        self.instr.read_termination = "\n"
240        for char in ("\r", None):
241            self.instr.write_termination = "\n" if char else "\r"
242            self.instr.write("RECEIVE", termination="\n")
243            with pytest.warns(Warning):
244                self.instr.write("test\r", termination=char)
245            self.instr.write("", termination="\n")
246            self.instr.write("SEND", termination="\n")
247            assert self.instr.read() == "test\r\r"
248
249        # TODO not sure how to test encoding
250
251    def test_handling_exception_in_read_raw(self, caplog):
252        """Test handling exception in read_bytes (monkeypatching)"""
253
254        def false_read(session, size):
255            raise errors.VisaIOError(constants.VI_ERROR_ABORT)
256
257        read = self.instr.visalib.read
258        self.instr.visalib.read = false_read
259        with caplog.at_level(logging.DEBUG):
260            try:
261                self.instr.read()
262            except errors.VisaIOError:
263                pass
264            finally:
265                self.instr.visalib.read = read
266
267        assert caplog.records
268
269    def test_write_ascii_values(self):
270        """Test writing ascii values."""
271        # Standard separator
272        values = [1, 2, 3, 4, 5]
273        self.instr.write("RECEIVE")
274        count = self.instr.write_ascii_values("", values, "d")
275        assert count == 10
276        self.instr.write("SEND")
277        assert self.instr.read() == "1,2,3,4,5"
278
279        # Non standard separator and termination
280        self.instr.write_termination = "\r"
281        self.instr.write("RECEIVE", termination="\n")
282        self.instr.write_ascii_values("", values, "d", separator=";", termination=False)
283        self.instr.write("", termination="\n")
284        self.instr.write("SEND", termination="\n")
285        assert self.instr.read() == "1;2;3;4;5"
286
287        # Test handling repeated term char
288        for char in ("\r", None):
289            self.instr.write_termination = "\n" if char else "\r"
290            self.instr.write("RECEIVE", termination="\n")
291            with pytest.warns(Warning):
292                values = [1, 2, 3, 4, 5]
293                self.instr.write_ascii_values(
294                    "\r", values, "s", separator=";", termination=char
295                )
296            self.instr.write("", termination="\n")
297            self.instr.write("SEND", termination="\n")
298            assert self.instr.read() == "\r1;2;3;4;5\r"
299
300    @pytest.mark.parametrize(
301        "hfmt, prefix", zip(("ieee", "hp", "empty"), (b"#212", b"#A\x0c\x00", b""))
302    )
303    def test_write_binary_values(self, hfmt, prefix):
304        """Test writing binary data."""
305        values = [1, 2, 3, 4, 5, 6]
306        self.instr.write_termination = "\n"
307        self.instr.write("RECEIVE")
308        count = self.instr.write_binary_values("", values, "h", header_fmt=hfmt)
309        # Each interger encoded as h uses 2 bytes
310        assert count == len(prefix) + 12 + 1
311        self.instr.write("SEND")
312        msg = self.instr.read_bytes(13 + len(prefix))
313        assert msg == prefix + b"\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00\x06\x00\n"
314
315        if hfmt == "hp":
316            fl_prefix = prefix[0:2] + prefix[-2::][::-1]
317        else:
318            fl_prefix = prefix
319        self.instr.write_termination = "\r"
320        self.instr.write("RECEIVE", termination="\n")
321        self.instr.write_binary_values(
322            "", values, "h", is_big_endian=True, termination=False, header_fmt=hfmt
323        )
324        self.instr.write("", termination="\n")
325        self.instr.write("SEND", termination="\n")
326        assert (
327            self.instr.read_bytes(13 + len(prefix))
328            == fl_prefix + b"\x00\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00\x06\n"
329        )
330
331        # Test handling repeated term char
332        for char in ("\r", None):
333            self.instr.write_termination = "\n" if char else "\r"
334            self.instr.write("RECEIVE", termination="\n")
335            with pytest.warns(Warning):
336                self.instr.write_binary_values(
337                    "\r", values, "h", header_fmt=hfmt, termination=char
338                )
339            self.instr.write("", termination="\n")
340            self.instr.write("SEND", termination="\n")
341            msg = self.instr.read()
342            assert (
343                msg
344                == "\r"
345                + prefix.decode("ascii")
346                + "\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00\x06\x00\r"
347            )
348
349        # Wrong header format
350        with pytest.raises(ValueError):
351            self.instr.write_binary_values("", values, "h", header_fmt="zxz")
352
353    def test_read_ascii_values(self):
354        """Test reading ascii values."""
355        # Standard separator
356        self.instr.write("RECEIVE")
357        self.instr.write("1,2,3,4,5")
358        self.instr.write("SEND")
359        values = self.instr.read_ascii_values()
360        assert type(values[0]) is float
361        assert values == [1.0, 2.0, 3.0, 4.0, 5.0]
362
363        # Non standard separator and termination
364        self.instr.write("RECEIVE")
365        self.instr.write("1;2;3;4;5")
366        tic = time.time()
367        values = self.instr.query_ascii_values(
368            "SEND", converter="d", separator=";", delay=0.5
369        )
370        assert time.time() - tic > 0.5
371        assert type(values[0]) is int
372        assert values == [1, 2, 3, 4, 5]
373
374        # Numpy container
375        if np:
376            self.instr.write("RECEIVE")
377            self.instr.write("1,2,3,4,5")
378            self.instr.write("SEND")
379            values = self.instr.read_ascii_values(container=np.array)
380            expected = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
381            assert values.dtype is expected.dtype
382            np.testing.assert_array_equal(values, expected)
383
384    @pytest.mark.parametrize("hfmt", ("ieee", "hp"))
385    def test_read_binary_values(self, hfmt):
386        """Test reading binary data."""
387        # TODO test handling binary decoding issue (troublesome)
388        self.instr.read_termination = "\r"
389        # 3328 in binary short is \x00\r this way we can interrupt the
390        # transmission midway to test some corner cases
391        data = [1, 2, 3328, 3, 4, 5, 6, 7]
392
393        self.instr.write("RECEIVE")
394        self.instr.write_binary_values(
395            "", data, "h", header_fmt=hfmt, termination="\r\n"
396        )
397        self.instr.write("SEND")
398        new = self.instr.read_binary_values(
399            datatype="h",
400            is_big_endian=False,
401            header_fmt=hfmt,
402            expect_termination=True,
403            chunk_size=8,
404        )
405        self.instr.read_bytes(1)
406        assert data == new
407
408        self.instr.write("RECEIVE")
409        self.instr.write_binary_values(
410            "", data, "h", header_fmt=hfmt, is_big_endian=True
411        )
412
413        new = self.instr.query_binary_values(
414            "SEND",
415            datatype="h",
416            header_fmt=hfmt,
417            is_big_endian=True,
418            expect_termination=False,
419            chunk_size=8,
420            container=np.array if np else list,
421        )
422        self.instr.read_bytes(1)
423        if np:
424            np.testing.assert_array_equal(new, np.array(data, dtype=np.int16))
425        else:
426            assert data == new
427
428    def test_read_query_binary_values_invalid_header(self):
429        """Test we properly handle an invalid header."""
430        data = [1, 2, 3328, 3, 4, 5, 6, 7]
431        self.instr.write("RECEIVE")
432        self.instr.write_binary_values(
433            "", data, "h", header_fmt="ieee", is_big_endian=True
434        )
435        self.instr.write("SEND")
436        with pytest.raises(ValueError):
437            self.instr.read_binary_values(
438                datatype="h",
439                is_big_endian=False,
440                header_fmt="invalid",
441                expect_termination=True,
442                chunk_size=8,
443            )
444
445        self.instr.write("RECEIVE")
446        self.instr.write_binary_values(
447            "", data, "h", header_fmt="ieee", is_big_endian=True
448        )
449        with pytest.raises(ValueError):
450            self.instr.query_binary_values(
451                "*IDN",
452                datatype="h",
453                is_big_endian=False,
454                header_fmt="invalid",
455                expect_termination=True,
456                chunk_size=8,
457            )
458
459    # Not sure how to test this
460    @pytest.mark.skip
461    def test_handling_malformed_binary(self):
462        """"""
463        pass
464
465    @pytest.mark.parametrize(
466        "hfmt, header", zip(("ieee", "hp", "empty"), ("#10", "#A\x00\x00", ""))
467    )
468    def test_read_binary_values_unreported_length(self, hfmt, header):
469        """Test reading binary data."""
470        self.instr.read_termination = "\r"
471        # 3328 in binary short is \x00\r this way we can interrupt the
472        # transmission midway to test some corner cases
473        data = [1, 2, 3328, 3, 4, 5]
474
475        self.instr.write("RECEIVE")
476        self.instr.write(
477            header + "\x01\x00\x02\x00\x00\r\x03\x00\x04\x00\x05\x00",
478            termination="\r\n",
479        )
480        self.instr.write("SEND")
481        new = self.instr.read_binary_values(
482            datatype="h",
483            is_big_endian=False,
484            header_fmt=hfmt,
485            expect_termination=True,
486            chunk_size=6,
487            data_points=6,
488        )
489        self.instr.read_bytes(1)
490        assert data == new
491
492        self.instr.write("RECEIVE")
493        self.instr.write(
494            header + "\x00\x01\x00\x02\r\x00\x00\x03\x00\x04\x00\x05",
495            termination="\r\n",
496        )
497        new = self.instr.query_binary_values(
498            "SEND",
499            datatype="h",
500            header_fmt=hfmt,
501            is_big_endian=True,
502            expect_termination=False,
503            chunk_size=6,
504            container=np.array if np else list,
505            data_points=6,
506        )
507        self.instr.read_bytes(1)
508        if np:
509            np.testing.assert_array_equal(new, np.array(data, dtype=np.int16))
510        else:
511            assert data == new
512
513        # Check we do error on unreported/unspecified length
514        self.instr.write("RECEIVE")
515        self.instr.write(
516            header + "\x01\x00\x02\x00\x00\r\x03\x00\x04\x00\x05\x00",
517            termination="\r\n",
518        )
519        self.instr.write("SEND")
520        with pytest.raises(ValueError):
521            self.instr.read_binary_values(
522                datatype="h",
523                is_big_endian=False,
524                header_fmt=hfmt,
525                expect_termination=True,
526                chunk_size=6,
527            )
528
529    def test_delay_in_query_ascii(self):
530        """Test handling of the delay argument in query_ascii_values."""
531        # Test using the instrument wide delay
532        self.instr.query_delay = 1.0
533        self.instr.write("RECEIVE")
534        self.instr.write("1,2,3,4,5")
535        tic = time.perf_counter()
536        values = self.instr.query_ascii_values("SEND")
537        assert time.perf_counter() - tic > 0.99
538        assert type(values[0]) is float
539        assert values == [1.0, 2.0, 3.0, 4.0, 5.0]
540
541        # Test specifying the delay
542        self.instr.query_delay = 0.0
543        self.instr.write("RECEIVE")
544        self.instr.write("1,2,3,4,5")
545        tic = time.perf_counter()
546        values = self.instr.query_ascii_values("SEND", delay=1.0)
547        assert time.perf_counter() - tic > 0.99
548        assert type(values[0]) is float
549        assert values == [1.0, 2.0, 3.0, 4.0, 5.0]
550
551        # Test specifying a 0 delay
552        self.instr.query_delay = 1.0
553        self.instr.write("RECEIVE")
554        self.instr.write("1,2,3,4,5")
555        tic = time.perf_counter()
556        values = self.instr.query_ascii_values("SEND", delay=0.0)
557        assert time.perf_counter() - tic < 0.99
558        assert type(values[0]) is float
559        assert values == [1.0, 2.0, 3.0, 4.0, 5.0]
560
561    def test_instrument_wide_delay_in_query_binary(self):
562        """Test handling delay in query_ascii_values."""
563        header = "#10"
564        data = [1, 2, 3328, 3, 4, 5]
565        # Test using the instrument wide delay
566        self.instr.query_delay = 1.0
567        self.instr.write("RECEIVE")
568        self.instr.write(
569            header + "\x00\x01\x00\x02\r\x00\x00\x03\x00\x04\x00\x05",
570            termination="\r\n",
571        )
572        tic = time.perf_counter()
573        new = self.instr.query_binary_values(
574            "SEND",
575            datatype="h",
576            header_fmt="ieee",
577            is_big_endian=True,
578            expect_termination=False,
579            chunk_size=6,
580            data_points=6,
581        )
582
583        assert time.perf_counter() - tic > 0.99
584        assert data == new
585
586    def test_delay_args_in_query_binary(self):
587        """Test handling of the delay argument in query_ascii_values."""
588        header = "#10"
589        data = [1, 2, 3328, 3, 4, 5]
590        self.instr.query_delay = 0.0
591        self.instr.write("RECEIVE")
592        self.instr.write(
593            header + "\x00\x01\x00\x02\r\x00\x00\x03\x00\x04\x00\x05",
594            termination="\r\n",
595        )
596        tic = time.perf_counter()
597        new = self.instr.query_binary_values(
598            "SEND",
599            datatype="h",
600            header_fmt="ieee",
601            is_big_endian=True,
602            expect_termination=False,
603            chunk_size=6,
604            data_points=6,
605            delay=1.0,
606        )
607
608        assert time.perf_counter() - tic > 0.99
609        assert data == new
610
611    def test_no_delay_args_in_query_binary(self):
612        """Test handling of the delay argument in query_ascii_values."""
613        header = "#10"
614        data = [1, 2, 3328, 3, 4, 5]
615        self.instr.query_delay = 1.0
616        self.instr.write("RECEIVE")
617        self.instr.write(
618            header + "\x00\x01\x00\x02\r\x00\x00\x03\x00\x04\x00\x05",
619            termination="\r\n",
620        )
621        tic = time.perf_counter()
622        new = self.instr.query_binary_values(
623            "SEND",
624            datatype="h",
625            header_fmt="ieee",
626            is_big_endian=True,
627            expect_termination=False,
628            chunk_size=6,
629            data_points=6,
630            delay=0.0,
631        )
632
633        assert time.perf_counter() - tic < 1.0
634        assert data == new
635
636    def test_stb(self):
637        """Test reading the status byte."""
638        assert 0 <= self.instr.stb <= 256
639        assert 0 <= self.instr.read_stb() <= 256
640
641
642class EventAwareMessagebasedResourceTestCaseMixin(EventAwareResourceTestCaseMixin):
643    """Mixin for message based resources supporting events."""
644
645    def test_manually_called_handlers(self):
646        """Test calling manually even handler."""
647
648        class FalseResource(Resource):
649            session = None
650            visalib = None
651            _session = None
652
653            def __init__(self):
654                pass
655
656        fres = FalseResource()
657        fres2 = FalseResource()
658        fres2.session = 1
659
660        handler = EventHandler()
661        false_wrapped_handler = fres.wrap_handler(handler.simplified_handler)
662        false_wrapped_handler(None, EventType.clear, 1, 1)
663        assert handler.event_success
664
665        with pytest.raises(RuntimeError):
666            false_wrapped_handler(1, EventType.clear, 1, 1)
667
668    def test_handling_invalid_handler(self):
669        """Test handling an error related to a wrong handler type."""
670        with pytest.raises(errors.VisaTypeError):
671            event_type = EventType.exception
672            self.instr.install_handler(event_type, 1, object())
673
674    def test_uninstalling_missing_visa_handler(self):
675        """Test uninstalling a visa handler that was not registered."""
676        handler1 = EventHandler()
677        handler2 = EventHandler()
678        event_type = EventType.exception
679        self.instr.install_handler(event_type, handler1.handle_event)
680        with pytest.raises(errors.UnknownHandler):
681            self.instr.uninstall_handler(event_type, handler2.handle_event)
682
683        self.instr.uninstall_handler(event_type, handler1.handle_event)
684
685        with pytest.raises(errors.UnknownHandler):
686            self.instr.uninstall_handler(event_type, handler2.handle_event)
687
688    def test_handler_clean_up_on_resource_del(self):
689        """Test that handlers are properly cleaned when a resource is deleted."""
690        handler = EventHandler()
691        event_type = EventType.exception
692        self.instr.install_handler(event_type, handler.handle_event)
693
694        self.instr = None
695        gc.collect()
696        assert not self.rm.visalib.handlers
697
698    def test_uninstall_all_handlers(self):
699        """Test uninstall all handlers from all sessions."""
700        handler = EventHandler()
701        event_type = EventType.exception
702        self.instr.install_handler(event_type, handler.handle_event)
703
704        self.rm.visalib.uninstall_all_visa_handlers(None)
705        assert not self.rm.visalib.handlers
706
707    def test_manual_async_read(self):
708        """Test handling IOCompletion event which has extra attributes."""
709        # Prepare message
710        self.instr.write_raw(b"RECEIVE\n")
711        self.instr.write_raw(b"test\n")
712        self.instr.write_raw(b"SEND\n")
713
714        # Enable event handling
715        event_type = EventType.io_completion
716        event_mech = constants.EventMechanism.queue
717        wait_time = 2000  # set time that program waits to receive event
718        self.instr.enable_event(event_type, event_mech, None)
719
720        try:
721            visalib = self.instr.visalib
722            buffer, job_id, status_code = visalib.read_asynchronously(
723                self.instr.session, 10
724            )
725            assert buffer is visalib.get_buffer_from_id(job_id)
726            response = self.instr.wait_on_event(event_type, wait_time)
727        finally:
728            self.instr.disable_event(event_type, event_mech)
729
730        assert response.event.status == constants.StatusCode.success
731        assert bytes(buffer) == bytes(response.event.buffer)
732        assert bytes(response.event.data) == b"test\n"
733        assert response.event.return_count == 5
734        assert response.event.operation_name == "viReadAsync"
735
736    def test_getting_unknown_buffer(self):
737        """Test getting a buffer with a wrong ID."""
738        assert self.instr.visalib.get_buffer_from_id(1) is None
739
740    def test_wait_on_event_timeout(self):
741        """Test waiting on a VISA event."""
742        event_type = EventType.service_request
743        event_mech = constants.EventMechanism.queue
744        # Emit a clear to avoid dealing with previous requests
745        self.instr.clear()
746        self.instr.enable_event(event_type, event_mech, None)
747        try:
748            response = self.instr.wait_on_event(event_type, 10, capture_timeout=True)
749        finally:
750            self.instr.disable_event(event_type, event_mech)
751        assert response.timed_out
752        assert response.event.event_type == event_type
753
754        with pytest.raises(errors.VisaIOError):
755            self.instr.enable_event(event_type, event_mech, None)
756            try:
757                response = self.instr.wait_on_event(event_type, 10)
758            finally:
759                self.instr.disable_event(event_type, event_mech)
760
761    def test_wait_on_event(self):
762        """Test waiting on a VISA event."""
763        event_type = EventType.service_request
764        event_mech = constants.EventMechanism.queue
765        wait_time = 2000  # set time that program waits to receive event
766        self.instr.enable_event(event_type, event_mech, None)
767        self.instr.write("RCVSLOWSRQ")
768        self.instr.write("1")
769        self.instr.write("SENDSLOWSRQ")
770        try:
771            response = self.instr.wait_on_event(event_type, wait_time)
772        finally:
773            self.instr.disable_event(event_type, event_mech)
774
775        assert not response.timed_out
776        assert response.event.event_type == EventType.service_request
777        assert self.instr.read() == "1"
778
779        with pytest.warns(FutureWarning):
780            response.event_type
781        with pytest.warns(FutureWarning):
782            response.context
783
784    def test_managing_visa_handler(self):
785        """Test using visa handlers."""
786
787        def _test(handle):
788            handler = EventHandler()
789            event_type = EventType.service_request
790            event_mech = constants.EventMechanism.handler
791            user_handle = self.instr.install_handler(
792                event_type, handler.handle_event, user_handle=handle
793            )
794            self.instr.enable_event(event_type, event_mech, None)
795            self.instr.write("RCVSLOWSRQ")
796            self.instr.write("1")
797            self.instr.write("SENDSLOWSRQ")
798
799            try:
800                t1 = time.time()
801                while not handler.event_success:
802                    if (time.time() - t1) > 2:
803                        break
804                    time.sleep(0.1)
805            finally:
806                self.instr.disable_event(event_type, event_mech)
807                self.instr.uninstall_handler(
808                    event_type, handler.handle_event, user_handle
809                )
810
811            assert handler.session == self.instr.session
812            assert self.compare_user_handle(handler.handle, user_handle)
813            assert handler.srq_success
814            assert self.instr.read() == "1"
815            self.instr.clear()
816
817        class Point(ctypes.Structure):
818            _fields_ = [("x", ctypes.c_int), ("y", ctypes.c_int)]
819
820            def __eq__(self, other):
821                if type(self) is not type(other):
822                    return False
823                return self.x == other.x and self.y == other.y
824
825        for handle in (1, 1.0, "1", [1], [1.0], Point(1, 2)):
826            print(handle)
827            _test(handle)
828
829    def test_wrapping_handler(self):
830        """Test wrapping a handler using a Resource."""
831
832        handler = EventHandler()
833        event_type = EventType.service_request
834        event_mech = constants.EventMechanism.handler
835        wrapped_handler = self.instr.wrap_handler(handler.simplified_handler)
836        user_handle = self.instr.install_handler(event_type, wrapped_handler, 1)
837        self.instr.enable_event(event_type, event_mech, None)
838        self.instr.write("RCVSLOWSRQ")
839        self.instr.write("1")
840        self.instr.write("SENDSLOWSRQ")
841
842        try:
843            t1 = time.time()
844            while not handler.event_success:
845                if (time.time() - t1) > 2:
846                    break
847                time.sleep(0.1)
848        finally:
849            self.instr.disable_event(event_type, event_mech)
850            self.instr.uninstall_handler(event_type, wrapped_handler, user_handle)
851
852        assert self.instr.session == handler.session
853        assert self.compare_user_handle(handler.handle, user_handle)
854        assert handler.srq_success
855        assert self.instr.read() == "1"
856
857    def test_bare_handler(self):
858        """Test using a bare handler passing raw backend values."""
859        from pyvisa import ctwrapper
860
861        if not isinstance(self.instr.visalib, ctwrapper.IVIVisaLibrary):
862            return
863
864        ctwrapper.WRAP_HANDLER = False
865        try:
866            handler = EventHandler()
867            event_type = EventType.service_request
868            event_mech = constants.EventMechanism.handler
869            user_handle = self.instr.install_handler(
870                event_type, handler.handle_event, 1
871            )
872            self.instr.enable_event(event_type, event_mech, None)
873            self.instr.write("RCVSLOWSRQ")
874            self.instr.write("1")
875            self.instr.write("SENDSLOWSRQ")
876
877            try:
878                t1 = time.time()
879                while not handler.event_success:
880                    if (time.time() - t1) > 2:
881                        break
882                    time.sleep(0.1)
883            finally:
884                self.instr.disable_event(event_type, event_mech)
885                self.instr.uninstall_handler(
886                    event_type, handler.handle_event, user_handle
887                )
888
889            assert self.instr.session == handler.session.value
890            assert self.compare_user_handle(handler.handle.contents, user_handle)
891            assert handler.srq_success
892            assert self.instr.read() == "1"
893        finally:
894            ctwrapper.WRAP_HANDLER = True
895
896
897class LockableMessagedBasedResourceTestCaseMixin(LockableResourceTestCaseMixin):
898    """Mixing for message based resources supporting locking."""
899
900    def test_shared_locking(self):
901        """Test locking/unlocking a resource."""
902        instr2 = self.rm.open_resource(str(self.rname))
903        instr3 = self.rm.open_resource(str(self.rname))
904
905        key = self.instr.lock()
906        instr2.lock(requested_key=key)
907
908        assert self.instr.query("*IDN?")
909        assert instr2.query("*IDN?")
910        with pytest.raises(errors.VisaIOError):
911            instr3.query("*IDN?")
912
913        # Share the lock for a limited time
914        with instr3.lock_context(requested_key=key) as key2:
915            assert instr3.query("*IDN?")
916            assert key == key2
917
918        # Stop sharing the lock
919        instr2.unlock()
920
921        with pytest.raises(errors.VisaIOError):
922            instr2.query("*IDN?")
923        with pytest.raises(errors.VisaIOError):
924            instr3.query("*IDN?")
925
926        self.instr.unlock()
927
928        assert instr3.query("*IDN?")
929
930    def test_exclusive_locking(self):
931        """Test locking/unlocking a resource."""
932        instr2 = self.rm.open_resource(str(self.rname))
933
934        self.instr.lock_excl()
935        with pytest.raises(errors.VisaIOError):
936            instr2.query("*IDN?")
937
938        self.instr.unlock()
939
940        assert instr2.query("*IDN?")
941
942        # Share the lock for a limited time
943        with self.instr.lock_context(requested_key="exclusive") as key:
944            assert key is None
945            with pytest.raises(errors.VisaIOError):
946                instr2.query("*IDN?")
947