1from typing import Tuple, Any, List
2import cirq
3import pytest
4from pyquil import Program
5from pyquil.api import QuantumComputer
6import numpy as np
7from pyquil.gates import MEASURE, RX, X, DECLARE, H, CNOT
8from cirq_rigetti import RigettiQCSService
9from typing_extensions import Protocol
10from cirq_rigetti import circuit_transformers as transformers
11from cirq_rigetti import circuit_sweep_executors as executors
12
13
14_default_executor = executors.with_quilc_compilation_and_cirq_parameter_resolution
15
16
17class _ResultBuilder(Protocol):
18    def __call__(
19        self,
20        mock_qpu_implementer: Any,
21        circuit: cirq.Circuit,
22        sweepable: cirq.Sweepable,
23        *,
24        executor: executors.CircuitSweepExecutor = _default_executor,
25        transformer: transformers.CircuitTransformer = transformers.default,
26    ) -> Tuple[
27        List[cirq.Result],
28        QuantumComputer,
29        List["np.ndarray[Any, np.dtype[np.float_]]"],
30        List[cirq.ParamResolver],
31    ]:
32        pass
33
34
35def _build_service_results(
36    mock_qpu_implementer: Any,
37    circuit: cirq.Circuit,
38    sweepable: cirq.Sweepable,
39    *,
40    executor: executors.CircuitSweepExecutor = _default_executor,
41    transformer: transformers.CircuitTransformer = transformers.default,
42) -> Tuple[
43    List[cirq.Result],
44    QuantumComputer,
45    List["np.ndarray[Any, np.dtype[np.float_]]"],
46    List[cirq.ParamResolver],
47]:
48    repetitions = 2
49    param_resolvers = [r for r in cirq.to_resolvers(sweepable)]
50    param_resolver_index = min(1, len(param_resolvers) - 1)
51    param_resolver = param_resolvers[param_resolver_index]
52
53    expected_results = [
54        np.ones((repetitions,))
55        * (param_resolver["t"] if "t" in param_resolver else param_resolver_index)
56    ]
57    quantum_computer = mock_qpu_implementer.implement_passive_quantum_computer_with_results(
58        expected_results
59    )
60    service = RigettiQCSService(
61        quantum_computer=quantum_computer,
62        executor=executor,
63        transformer=transformer,
64    )
65
66    result = service.run(
67        circuit=circuit,
68        param_resolver=param_resolver,
69        repetitions=repetitions,
70    )
71    return [result], quantum_computer, expected_results, [param_resolver]
72
73
74def _build_sampler_results(
75    mock_qpu_implementer: Any,
76    circuit: cirq.Circuit,
77    sweepable: cirq.Sweepable,
78    *,
79    executor: executors.CircuitSweepExecutor = _default_executor,
80    transformer: transformers.CircuitTransformer = transformers.default,
81) -> Tuple[
82    List[cirq.Result],
83    QuantumComputer,
84    List["np.ndarray[Any, np.dtype[np.float_]]"],
85    cirq.Sweepable,
86]:
87    repetitions = 2
88
89    param_resolvers = [r for r in cirq.to_resolvers(sweepable)]
90    expected_results = [
91        np.ones((repetitions,)) * (params["t"] if "t" in params else i)
92        for i, params in enumerate(param_resolvers)
93    ]
94    quantum_computer = mock_qpu_implementer.implement_passive_quantum_computer_with_results(
95        expected_results
96    )
97    service = RigettiQCSService(
98        quantum_computer=quantum_computer,
99        executor=executor,
100        transformer=transformer,
101    )
102
103    sampler = service.sampler()
104
105    results = sampler.run_sweep(
106        program=circuit,
107        params=param_resolvers,
108        repetitions=repetitions,
109    )
110    return results, quantum_computer, expected_results, param_resolvers
111
112
113@pytest.mark.parametrize("result_builder", [_build_service_results, _build_sampler_results])
114def test_parametric_circuit(
115    mock_qpu_implementer: Any,
116    parametric_circuit_with_params: Tuple[cirq.Circuit, cirq.Sweepable],
117    result_builder: _ResultBuilder,
118) -> None:
119    """test that RigettiQCSService and RigettiQCSSampler can run a parametric
120    circuit with a specified set of parameters and return expected cirq.Results.
121    """
122
123    parametric_circuit = parametric_circuit_with_params[0]
124    sweepable = parametric_circuit_with_params[1]
125    results, quantum_computer, expected_results, param_resolvers = result_builder(
126        mock_qpu_implementer, parametric_circuit, sweepable
127    )
128
129    assert len(param_resolvers) == len(
130        results
131    ), "should return a result for every element in sweepable"
132
133    for i, param_resolver in enumerate(param_resolvers):
134        result = results[i]
135        assert param_resolver == result.params
136        assert np.allclose(
137            result.measurements["m"],
138            expected_results[i],
139        ), "should return an ordered list of results with correct set of measurements"
140
141    def test_executable(i: int, program: Program) -> None:
142        params = param_resolvers[i]
143        t = params["t"]
144        if t == 1:
145            assert (
146                X(0) in program.instructions
147            ), f"executable should contain an X(0) instruction at {i}"
148        else:
149            assert (
150                RX(np.pi * t, 0) in program.instructions
151            ), f"executable should contain an RX(pi*{t}) 0 instruction at {i}"
152        assert DECLARE("m0") in program.instructions, "executable should declare a read out bit"
153        assert (
154            MEASURE(0, ("m0", 0)) in program.instructions
155        ), "executable should measure the read out bit"
156
157    param_sweeps = len(param_resolvers)
158    assert param_sweeps == quantum_computer.compiler.quil_to_native_quil.call_count  # type: ignore
159    for i, call_args in enumerate(
160        quantum_computer.compiler.quil_to_native_quil.call_args_list  # type: ignore
161    ):
162        test_executable(i, call_args[0][0])
163
164    assert (
165        param_sweeps
166        == quantum_computer.compiler.native_quil_to_executable.call_count  # type: ignore
167    )
168    for i, call_args in enumerate(
169        quantum_computer.compiler.native_quil_to_executable.call_args_list  # type: ignore
170    ):
171        test_executable(i, call_args[0][0])
172
173    assert param_sweeps == quantum_computer.qam.run.call_count  # type: ignore
174    for i, call_args in enumerate(quantum_computer.qam.run.call_args_list):  # type: ignore
175        test_executable(i, call_args[0][0])
176
177
178@pytest.mark.parametrize("result_builder", [_build_service_results, _build_sampler_results])
179def test_bell_circuit(
180    mock_qpu_implementer: Any,
181    bell_circuit: cirq.Circuit,
182    result_builder: _ResultBuilder,
183) -> None:
184    """test that RigettiQCSService and RigettiQCSSampler can run a basic Bell circuit
185    with two read out bits and return expected cirq.Results.
186    """
187
188    param_resolvers = [cirq.ParamResolver({})]
189    results, quantum_computer, expected_results, param_resolvers = result_builder(
190        mock_qpu_implementer, bell_circuit, param_resolvers
191    )
192
193    assert len(param_resolvers) == len(
194        results
195    ), "should return a result for every element in sweepable"
196
197    for i, param_resolver in enumerate(param_resolvers):
198        result = results[i]
199        assert param_resolver == result.params
200        assert np.allclose(
201            result.measurements["m"],
202            expected_results[i],
203        ), "should return an ordered list of results with correct set of measurements"
204
205    def test_executable(program: Program) -> None:
206        assert H(0) in program.instructions, "bell circuit should include Hadamard"
207        assert CNOT(0, 1) in program.instructions, "bell circuit should include CNOT"
208        assert (
209            DECLARE("m0", memory_size=2) in program.instructions
210        ), "executable should declare a read out bit"
211        assert (
212            MEASURE(0, ("m0", 0)) in program.instructions
213        ), "executable should measure the first qubit to the first read out bit"
214        assert (
215            MEASURE(1, ("m0", 1)) in program.instructions
216        ), "executable should measure the second qubit to the second read out bit"
217
218    param_sweeps = len(param_resolvers)
219    assert param_sweeps == quantum_computer.compiler.quil_to_native_quil.call_count  # type: ignore
220    for i, call_args in enumerate(
221        quantum_computer.compiler.quil_to_native_quil.call_args_list  # type: ignore
222    ):
223        test_executable(call_args[0][0])
224
225    assert (
226        param_sweeps
227        == quantum_computer.compiler.native_quil_to_executable.call_count  # type: ignore
228    )
229    for i, call_args in enumerate(
230        quantum_computer.compiler.native_quil_to_executable.call_args_list  # type: ignore
231    ):
232        test_executable(call_args[0][0])
233
234    assert param_sweeps == quantum_computer.qam.run.call_count  # type: ignore
235    for i, call_args in enumerate(quantum_computer.qam.run.call_args_list):  # type: ignore
236        test_executable(call_args[0][0])
237
238
239@pytest.mark.parametrize("result_builder", [_build_service_results, _build_sampler_results])
240def test_explicit_qubit_id_map(
241    mock_qpu_implementer: Any,
242    bell_circuit_with_qids: Tuple[cirq.Circuit, List[cirq.LineQubit]],
243    result_builder: _ResultBuilder,
244) -> None:
245    """test that RigettiQCSService and RigettiQCSSampler accept explicit ``qubit_id_map``
246    to map ``cirq.Qid`` s to physical qubits.
247    """
248    bell_circuit, qubits = bell_circuit_with_qids
249
250    qubit_id_map = {
251        qubits[1]: "11",
252        qubits[0]: "13",
253    }
254    param_resolvers = [cirq.ParamResolver({})]
255    results, quantum_computer, expected_results, param_resolvers = result_builder(
256        mock_qpu_implementer,
257        bell_circuit,
258        param_resolvers,
259        transformer=transformers.build(
260            qubit_id_map=qubit_id_map,  # type: ignore
261        ),
262    )
263
264    assert len(param_resolvers) == len(
265        results
266    ), "should return a result for every element in sweepable"
267
268    for i, param_resolver in enumerate(param_resolvers):
269        result = results[i]
270        assert param_resolver == result.params
271        assert np.allclose(
272            result.measurements["m"],
273            expected_results[i],
274        ), "should return an ordered list of results with correct set of measurements"
275
276    def test_executable(program: Program) -> None:
277        assert H(13) in program.instructions, "bell circuit should include Hadamard"
278        assert CNOT(13, 11) in program.instructions, "bell circuit should include CNOT"
279        assert (
280            DECLARE("m0", memory_size=2) in program.instructions
281        ), "executable should declare a read out bit"
282        assert (
283            MEASURE(13, ("m0", 0)) in program.instructions
284        ), "executable should measure the first qubit to the first read out bit"
285        assert (
286            MEASURE(11, ("m0", 1)) in program.instructions
287        ), "executable should measure the second qubit to the second read out bit"
288
289    param_sweeps = len(param_resolvers)
290    assert param_sweeps == quantum_computer.compiler.quil_to_native_quil.call_count  # type: ignore
291    for i, call_args in enumerate(
292        quantum_computer.compiler.quil_to_native_quil.call_args_list  # type: ignore
293    ):
294        test_executable(call_args[0][0])
295
296    assert (
297        param_sweeps
298        == quantum_computer.compiler.native_quil_to_executable.call_count  # type: ignore
299    )
300    for i, call_args in enumerate(
301        quantum_computer.compiler.native_quil_to_executable.call_args_list  # type: ignore
302    ):
303        test_executable(call_args[0][0])
304
305    assert param_sweeps == quantum_computer.qam.run.call_count  # type: ignore
306    for i, call_args in enumerate(quantum_computer.qam.run.call_args_list):  # type: ignore
307        test_executable(call_args[0][0])
308
309
310@pytest.mark.parametrize("result_builder", [_build_service_results, _build_sampler_results])
311def test_run_without_quilc_compilation(
312    mock_qpu_implementer: Any,
313    bell_circuit: cirq.Circuit,
314    result_builder: _ResultBuilder,
315) -> None:
316    """test that RigettiQCSService and RigettiQCSSampler allow users to execute
317    without using quilc to compile to native Quil.
318    """
319    param_resolvers = [cirq.ParamResolver({})]
320    results, quantum_computer, expected_results, param_resolvers = result_builder(
321        mock_qpu_implementer,
322        bell_circuit,
323        param_resolvers,
324        executor=executors.without_quilc_compilation,
325    )
326
327    assert len(param_resolvers) == len(
328        results
329    ), "should return a result for every element in sweepable"
330
331    for i, param_resolver in enumerate(param_resolvers):
332        result = results[i]
333        assert param_resolver == result.params
334        assert np.allclose(
335            result.measurements["m"],
336            expected_results[i],
337        ), "should return an ordered list of results with correct set of measurements"
338
339    def test_executable(program: Program) -> None:
340        assert H(0) in program.instructions, "bell circuit should include Hadamard"
341        assert CNOT(0, 1) in program.instructions, "bell circuit should include CNOT"
342        assert (
343            DECLARE("m0", memory_size=2) in program.instructions
344        ), "executable should declare a read out bit"
345        assert (
346            MEASURE(0, ("m0", 0)) in program.instructions
347        ), "executable should measure the first qubit to the first read out bit"
348        assert (
349            MEASURE(1, ("m0", 1)) in program.instructions
350        ), "executable should measure the second qubit to the second read out bit"
351
352    assert 0 == quantum_computer.compiler.quil_to_native_quil.call_count  # type: ignore
353
354    param_sweeps = len(param_resolvers)
355    assert (
356        param_sweeps
357        == quantum_computer.compiler.native_quil_to_executable.call_count  # type: ignore
358    )
359    for i, call_args in enumerate(
360        quantum_computer.compiler.native_quil_to_executable.call_args_list  # type: ignore
361    ):
362        test_executable(call_args[0][0])
363
364    assert param_sweeps == quantum_computer.qam.run.call_count  # type: ignore
365    for i, call_args in enumerate(quantum_computer.qam.run.call_args_list):  # type: ignore
366        test_executable(call_args[0][0])
367