1# Copyright 2019 The Cirq Developers
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""A protocol for implementing high performance channel evolutions."""
15
16from typing import Any, Iterable, Optional, Sequence, TypeVar, Tuple, Union
17
18import numpy as np
19from typing_extensions import Protocol
20
21from cirq import linalg
22from cirq._doc import doc_private
23from cirq.protocols.apply_unitary_protocol import (
24    apply_unitary,
25    ApplyUnitaryArgs,
26)
27from cirq.protocols.kraus_protocol import kraus
28from cirq.protocols import qid_shape_protocol
29from cirq.type_workarounds import NotImplementedType
30
31# This is a special indicator value used by the apply_channel method
32# to determine whether or not the caller provided a 'default' argument. It must
33# be of type np.ndarray to ensure the method has the correct type signature in
34# that case. It is checked for using `is`, so it won't have a false positive if
35# the user provides a different np.array([]) value.
36
37RaiseTypeErrorIfNotProvided = np.array([])  # type: np.ndarray
38
39TDefault = TypeVar('TDefault')
40
41
42class ApplyChannelArgs:
43    r"""Arguments for efficiently performing a channel.
44
45    A channel performs the mapping
46        $$
47        X \rightarrow \sum_i A_i X A_i^\dagger
48        $$
49    for operators $A_i$ that satisfy the normalization condition
50        $$
51        \sum_i A_i^\dagger A_i = I.
52        $$
53
54    The receiving object is expected to mutate `target_tensor` so that it
55    contains the density matrix after multiplication, and then return
56    `target_tensor`. Alternatively, if workspace is required,
57    the receiving object can overwrite `out_buffer` with the results
58    and return `out_buffer`. Or, if the receiving object is attempting to
59    be simple instead of fast, it can create an entirely new array and
60    return that.
61
62    Attributes:
63        target_tensor: The input tensor that needs to be left and right
64            multiplied and summed, representing the effect of the channel.
65            The tensor will have the shape (2, 2, 2, ..., 2). It usually
66            corresponds to a multi-qubit density matrix, with the first
67            n indices corresponding to the rows of the density matrix and
68            the last n indices corresponding to the columns of the density
69            matrix.
70        out_buffer: Pre-allocated workspace with the same shape and
71            dtype as the target tensor. If buffers are used, the result should
72            end up in this buffer. It is the responsibility of calling code
73            to notice if the result is this buffer.
74        auxiliary_buffer0: Pre-allocated workspace with the same shape and dtype
75            as the target tensor.
76        auxiliary_buffer1: Pre-allocated workspace with the same shape
77            and dtype as the target tensor.
78        left_axes: Which axes to multiply the left action of the channel upon.
79        right_axes: Which axes to multiply the right action of the channel upon.
80    """
81
82    def __init__(
83        self,
84        target_tensor: np.ndarray,
85        out_buffer: np.ndarray,
86        auxiliary_buffer0: np.ndarray,
87        auxiliary_buffer1: np.ndarray,
88        left_axes: Iterable[int],
89        right_axes: Iterable[int],
90    ):
91        """Args for apply channel.
92
93        Args:
94            target_tensor: The input tensor that needs to be left and right
95                multiplied and summed representing the effect of the channel.
96                The tensor will have the shape (2, 2, 2, ..., 2). It usually
97                corresponds to a multi-qubit density matrix, with the first
98                n indices corresponding to the rows of the density matrix and
99                the last n indices corresponding to the columns of the density
100                matrix.
101            out_buffer: Pre-allocated workspace with the same shape and
102                dtype as the target tensor. If buffers are used, the result
103                should end up in this buffer. It is the responsibility of
104                calling code to notice if the result is this buffer.
105            auxiliary_buffer0: Pre-allocated workspace with the same shape and
106                dtype as the target tensor.
107            auxiliary_buffer1: Pre-allocated workspace with the same shape
108                and dtype as the target tensor.
109            left_axes: Which axes to multiply the left action of the channel
110                upon.
111            right_axes: Which axes to multiply the right action of the channel
112                upon.
113        """
114        self.target_tensor = target_tensor
115        self.out_buffer = out_buffer
116        self.auxiliary_buffer0 = auxiliary_buffer0
117        self.auxiliary_buffer1 = auxiliary_buffer1
118        self.left_axes = tuple(left_axes)
119        self.right_axes = tuple(right_axes)
120
121
122class SupportsApplyChannel(Protocol):
123    """An object that can efficiently implement a channel."""
124
125    @doc_private
126    def _apply_channel_(
127        self, args: ApplyChannelArgs
128    ) -> Union[np.ndarray, None, NotImplementedType]:
129        """Efficiently applies a channel.
130
131        This method is given both the target tensor and workspace of the same
132        shape and dtype. The method then either performs inline modifications of
133        the target tensor and returns it, or writes its output into the
134        a workspace tensor and returns that. This signature makes it possible to
135        write specialized simulation methods that run without performing large
136        allocations, significantly increasing simulation performance.
137
138        Args:
139            args: A `cirq.ApplyChannelArgs` object with the `args.target_tensor`
140                to operate on, an `args.out_buffer`, 'args.auxiliary_buffer0`
141                and `args.auxiliary_buffer1` buffers to use as temporary
142                workspace, and the `args.left_axes` and `args.right_axes` of
143                the tensor to target with the unitary operation. Note that
144                this method is permitted (and in fact expected) to mutate
145                `args.target_tensor` and the given buffers.
146
147        Returns:
148            If the receiving object is not able to apply a chanel, None
149            or NotImplemented should be returned.
150
151            If the receiving object is able to work inline, it should directly
152            mutate `args.target_tensor` and then return `args.target_tensor`.
153            The caller will understand this to mean that the result is in
154            `args.target_tensor`.
155
156            If the receiving object is unable to work inline, it can write its
157            output over `args.out_buffer` and then return `args.out_buffer`.
158            The caller will understand this to mean that the result is in
159            `args.out_buffer` (and so what was `args.out` will become
160            `args.target_tensor` in the next call, and vice versa).
161
162            The receiving object is also permitted to allocate a new
163            numpy.ndarray and return that as its result.
164        """
165
166
167def apply_channel(
168    val: Any, args: ApplyChannelArgs, default: TDefault = RaiseTypeErrorIfNotProvided
169) -> Union[np.ndarray, TDefault]:
170    """High performance evolution under a channel evolution.
171
172    If `val` defines an `_apply_channel_` method, that method will be
173    used to apply `val`'s channel effect to the target tensor. Otherwise, if
174    `val` defines an `_apply_unitary_` method, that method will be used to
175    apply `val`s channel effect to the target tensor.  Otherwise, if `val`
176    returns a non-default channel with `cirq.channel`, that channel will be
177    applied using a generic method.  If none of these cases apply, an
178    exception is raised or the specified default value is returned.
179
180
181    Args:
182        val: The value with a channel to apply to the target.
183        args: A mutable `cirq.ApplyChannelArgs` object describing the target
184            tensor, available workspace, and left and right axes to operate on.
185            The attributes of this object will be mutated as part of computing
186            the result.
187        default: What should be returned if `val` doesn't have a channel. If
188            not specified, a TypeError is raised instead of returning a default
189            value.
190
191    Returns:
192        If the receiving object is not able to apply a channel,
193        the specified default value is returned (or a TypeError is raised). If
194        this occurs, then `target_tensor` should not have been mutated.
195
196        If the receiving object was able to work inline, directly
197        mutating `target_tensor` it will return `target_tensor`. The caller is
198        responsible for checking if the result is `target_tensor`.
199
200        If the receiving object wrote its output over `out_buffer`, the
201        result will be `out_buffer`. The caller is responsible for
202        checking if the result is `out_buffer` (and e.g. swapping
203        the buffer for the target tensor before the next call).
204
205        Note that it is an error for the return object to be either of the
206        auxiliary buffers, and the method will raise an AssertionError if
207        this contract is violated.
208
209        The receiving object may also write its output over a new buffer
210        that it created, in which case that new array is returned.
211
212    Raises:
213        TypeError: `val` doesn't have a channel and `default` wasn't specified.
214        ValueError: Different left and right shapes of `args.target_tensor`
215            selected by `left_axes` and `right_axes` or `qid_shape(val)` doesn't
216            equal the left and right shapes.
217        AssertionError: `_apply_channel_` returned an auxiliary buffer.
218    """
219    # Verify that val has the same qid shape as the selected axes of the density
220    # matrix tensor.
221    val_qid_shape = qid_shape_protocol.qid_shape(val, (2,) * len(args.left_axes))
222    left_shape = tuple(args.target_tensor.shape[i] for i in args.left_axes)
223    right_shape = tuple(args.target_tensor.shape[i] for i in args.right_axes)
224    if left_shape != right_shape:
225        raise ValueError(
226            'Invalid target_tensor shape or selected axes. '
227            'The selected left and right shape of target_tensor '
228            'are not equal. Got {!r} and {!r}.'.format(left_shape, right_shape)
229        )
230    if val_qid_shape != left_shape:
231        raise ValueError(
232            'Invalid channel qid shape is not equal to the '
233            'selected left and right shape of target_tensor. '
234            'Got {!r} but expected {!r}.'.format(val_qid_shape, left_shape)
235        )
236
237    # Check if the specialized method is present.
238    func = getattr(val, '_apply_channel_', None)
239    if func is not None:
240        result = func(args)
241        if result is not NotImplemented and result is not None:
242
243            def err_str(buf_num_str):
244                return (
245                    "Object of type '{}' returned a result object equal to "
246                    "auxiliary_buffer{}. This type violates the contract "
247                    "that appears in apply_channel's documentation.".format(type(val), buf_num_str)
248                )
249
250            assert result is not args.auxiliary_buffer0, err_str('0')
251            assert result is not args.auxiliary_buffer1, err_str('1')
252            return result
253
254    # Possibly use `apply_unitary`.
255    result = _apply_unitary(val, args)
256    if result is not None:
257        return result
258
259    # Fallback to using the object's `_kraus_` matrices.
260    ks = kraus(val, None)
261    if ks is not None:
262        return _apply_kraus(ks, args)
263
264    # Don't know how to apply channel. Fallback to specified default behavior.
265    if default is not RaiseTypeErrorIfNotProvided:
266        return default
267    raise TypeError(
268        "object of type '{}' has no _apply_channel_, _apply_unitary_, "
269        "_unitary_, or _kraus_ methods (or they returned None or "
270        "NotImplemented).".format(type(val))
271    )
272
273
274def _apply_unitary(val: Any, args: 'ApplyChannelArgs') -> Optional[np.ndarray]:
275    """Attempt to use `apply_unitary` and return the result.
276
277    If `val` does not support `apply_unitary` returns None.
278    """
279    left_args = ApplyUnitaryArgs(
280        target_tensor=args.target_tensor,
281        available_buffer=args.auxiliary_buffer0,
282        axes=args.left_axes,
283    )
284    left_result = apply_unitary(val, left_args, None)
285    if left_result is None:
286        return None
287    right_args = ApplyUnitaryArgs(
288        target_tensor=np.conjugate(left_result),
289        available_buffer=args.out_buffer,
290        axes=args.right_axes,
291    )
292    right_result = apply_unitary(val, right_args)
293    np.conjugate(right_result, out=right_result)
294    return right_result
295
296
297def _apply_kraus(
298    kraus: Union[Tuple[np.ndarray], Sequence[Any]], args: 'ApplyChannelArgs'
299) -> np.ndarray:
300    """Directly apply the kraus operators to the target tensor."""
301    # Initialize output.
302    args.out_buffer[:] = 0
303    # Stash initial state into buffer0.
304    np.copyto(dst=args.auxiliary_buffer0, src=args.target_tensor)
305
306    # Special case for single-qubit operations.
307    if len(args.left_axes) == 1 and kraus[0].shape == (2, 2):
308        return _apply_kraus_single_qubit(kraus, args)
309    # Fallback to np.einsum for the general case.
310    return _apply_kraus_multi_qubit(kraus, args)
311
312
313def _apply_kraus_single_qubit(
314    kraus: Union[Tuple[Any], Sequence[Any]], args: 'ApplyChannelArgs'
315) -> np.ndarray:
316    """Use slicing to apply single qubit channel.  Only for two-level qubits."""
317    zero_left = linalg.slice_for_qubits_equal_to(args.left_axes, 0)
318    one_left = linalg.slice_for_qubits_equal_to(args.left_axes, 1)
319    zero_right = linalg.slice_for_qubits_equal_to(args.right_axes, 0)
320    one_right = linalg.slice_for_qubits_equal_to(args.right_axes, 1)
321    for kraus_op in kraus:
322        np.copyto(dst=args.target_tensor, src=args.auxiliary_buffer0)
323        linalg.apply_matrix_to_slices(
324            args.target_tensor, kraus_op, [zero_left, one_left], out=args.auxiliary_buffer1
325        )
326        # No need to transpose as we are acting on the tensor
327        # representation of matrix, so transpose is done for us.
328        linalg.apply_matrix_to_slices(
329            args.auxiliary_buffer1,
330            np.conjugate(kraus_op),
331            [zero_right, one_right],
332            out=args.target_tensor,
333        )
334        args.out_buffer += args.target_tensor
335    return args.out_buffer
336
337
338def _apply_kraus_multi_qubit(
339    kraus: Union[Tuple[Any], Sequence[Any]], args: 'ApplyChannelArgs'
340) -> np.ndarray:
341    """Use numpy's einsum to apply a multi-qubit channel."""
342    qid_shape = tuple(args.target_tensor.shape[i] for i in args.left_axes)
343    for kraus_op in kraus:
344        np.copyto(dst=args.target_tensor, src=args.auxiliary_buffer0)
345        kraus_tensor = np.reshape(kraus_op.astype(args.target_tensor.dtype), qid_shape * 2)
346        linalg.targeted_left_multiply(
347            kraus_tensor, args.target_tensor, args.left_axes, out=args.auxiliary_buffer1
348        )
349        # No need to transpose as we are acting on the tensor
350        # representation of matrix, so transpose is done for us.
351        linalg.targeted_left_multiply(
352            np.conjugate(kraus_tensor),
353            args.auxiliary_buffer1,
354            args.right_axes,
355            out=args.target_tensor,
356        )
357        args.out_buffer += args.target_tensor
358    return args.out_buffer
359