1# Copyright (c) 2016 Ultimaker B.V.
2# Uranium is released under the terms of the LGPLv3 or higher.
3from unittest.mock import patch, MagicMock
4
5import pytest
6
7from UM import FlameProfiler
8from UM.Signal import Signal, signalemitter, postponeSignals, CompressTechnique
9from copy import deepcopy
10
11class SignalReceiver:
12    def __init__(self):
13        self._emit_count = 0
14
15    def getEmitCount(self):
16        return self._emit_count
17
18    def slot(self, *args, **kwargs):
19        self._emit_count += 1
20
21
22def test_signalWithFlameProfiler():
23    with patch("UM.Signal._recordSignalNames", MagicMock(return_value = True)):
24        FlameProfiler.record_profile = True
25        test = SignalReceiver()
26
27        signal = Signal(type=Signal.Direct)
28        signal.connect(test.slot)
29        signal.emit()
30
31        assert test.getEmitCount() == 1
32        FlameProfiler.record_profile = False
33
34
35def test_doubleSignalWithFlameProfiler():
36    FlameProfiler.record_profile = True
37    test = SignalReceiver()
38
39    signal = Signal(type=Signal.Direct)
40    signal2 = Signal(type=Signal.Direct)
41    signal.connect(test.slot)
42    signal2.connect(signal)
43    signal2.emit()
44    assert test.getEmitCount() == 1
45    FlameProfiler.record_profile = False
46
47
48def test_signal():
49    test = SignalReceiver()
50
51    signal = Signal(type = Signal.Direct)
52    signal.connect(test.slot)
53    signal.emit()
54
55    assert test.getEmitCount() == 1
56
57def test_postponeEmitNoCompression():
58    test = SignalReceiver()
59
60    signal = Signal(type=Signal.Direct)
61    signal.connect(test.slot)
62    with postponeSignals(signal, compress=CompressTechnique.NoCompression):
63        signal.emit()
64        assert test.getEmitCount() == 0  # as long as we're in this context, nothing should happen!
65        signal.emit()
66        assert test.getEmitCount() == 0
67    assert test.getEmitCount() == 2
68
69
70def test_postponeEmitCompressSingle():
71    test = SignalReceiver()
72
73    signal = Signal(type=Signal.Direct)
74    signal.connect(test.slot)
75    with postponeSignals(signal, compress=CompressTechnique.CompressSingle):
76        signal.emit()
77        assert test.getEmitCount() == 0  # as long as we're in this context, nothing should happen!
78        signal.emit()
79        assert test.getEmitCount() == 0
80    assert test.getEmitCount() == 1
81
82def test_postponeEmitCompressPerParameterValue():
83    test = SignalReceiver()
84
85    signal = Signal(type=Signal.Direct)
86    signal.connect(test.slot)
87    with postponeSignals(signal, compress=CompressTechnique.CompressPerParameterValue):
88        signal.emit("ZOMG")
89        assert test.getEmitCount() == 0  # as long as we're in this context, nothing should happen!
90        signal.emit("ZOMG")
91        assert test.getEmitCount() == 0
92        signal.emit("BEEP")
93    # We got 3 signal emits, but 2 of them were the same, so we end up with 2 unique emits.
94    assert test.getEmitCount() == 2
95
96
97def test_connectWhilePostponed():
98    test = SignalReceiver()
99
100    signal = Signal(type=Signal.Direct)
101    with postponeSignals(signal):
102        signal.connect(test.slot)  # This won't do anything, as we're postponing at the moment!
103        signal.emit()
104    assert test.getEmitCount() == 0  # The connection was never made, so we should get 0
105
106
107def test_disconnectWhilePostponed():
108    test = SignalReceiver()
109
110    signal = Signal(type=Signal.Direct)
111    signal.connect(test.slot)
112    with postponeSignals(signal):
113        signal.disconnect(test.slot)  # This won't do anything, as we're postponing at the moment!
114        signal.disconnectAll()  # Same holds true for the disconnect all
115        signal.emit()
116    assert test.getEmitCount() == 1  # Despite attempting to disconnect, this didn't happen because of the postpone
117
118
119def test_disconnectAll():
120    test = SignalReceiver()
121
122    signal = Signal(type=Signal.Direct)
123    signal.connect(test.slot)
124    signal.disconnectAll()
125    signal.emit()
126    assert test.getEmitCount() == 0
127
128
129def test_connectSelf():
130    signal = Signal(type=Signal.Direct)
131    signal.connect(signal)
132    signal.emit()  # If they are connected, this crashes with a max recursion depth error
133
134
135def test_deepCopy():
136    test = SignalReceiver()
137
138    signal = Signal(type=Signal.Direct)
139    signal.connect(test.slot)
140    copied_signal = deepcopy(signal)
141
142    copied_signal.emit()
143    # Even though the original signal was not called, the copied one should have the same result
144    assert test.getEmitCount() == 1
145
146
147def test_signalemitter():
148    def declare_signalemitter():
149        @signalemitter
150        class Test:
151            testSignal = Signal()
152
153        return Test
154
155    cls = declare_signalemitter()
156    assert cls is not None
157
158    inst = cls()
159    assert cls is not None
160
161    assert hasattr(inst, "testSignal")
162    assert inst.testSignal != cls.testSignal
163
164    def declare_bad_signalemitter():
165        @signalemitter
166        class Test:
167            pass
168
169        return Test
170
171    with pytest.raises(TypeError):
172        declare_bad_signalemitter()
173