1# Copyright 2018 The Cirq Developers
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#         https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from typing import Callable, List, Optional, Tuple, Set, Any, TYPE_CHECKING
16
17import numpy as np
18
19import cirq
20from cirq_google.line.placement import place_strategy, optimization
21from cirq_google.line.placement.chip import (
22    above,
23    right_of,
24    chip_as_adjacency_list,
25    EDGE,
26)
27from cirq_google.line.placement.sequence import GridQubitLineTuple, LineSequence
28
29if TYPE_CHECKING:
30    import cirq_google
31
32_STATE = Tuple[List[List[cirq.GridQubit]], Set[EDGE]]
33
34
35class AnnealSequenceSearch:
36    """Simulated annealing search heuristic."""
37
38    def __init__(self, device: 'cirq_google.XmonDevice', seed=None) -> None:
39        """Greedy sequence search constructor.
40
41        Args:
42          device: Chip description.
43          seed: Optional seed value for random number generator.
44        """
45        self._c = device.qubits
46        self._c_adj = chip_as_adjacency_list(device)
47        self._rand = np.random.RandomState(seed)
48
49    def search(
50        self, trace_func: Callable[[List[LineSequence], float, float, float, bool], None] = None
51    ) -> List[LineSequence]:
52        """Issues new linear sequence search.
53
54        Each call to this method starts new search.
55
56        Args:
57          trace_func: Optional callable which will be called for each simulated
58            annealing step with arguments: solution candidate (list of linear
59            sequences on the chip), current temperature (float), candidate cost
60            (float), probability of accepting candidate (float), and acceptance
61            decision (boolean).
62
63        Returns:
64          List of linear sequences on the chip found by this method.
65        """
66
67        def search_trace(
68            state: _STATE, temp: float, cost: float, probability: float, accepted: bool
69        ):
70            if trace_func:
71                trace_seqs, _ = state
72                trace_func(trace_seqs, temp, cost, probability, accepted)
73
74        seqs, _ = optimization.anneal_minimize(
75            self._create_initial_solution(),
76            self._quadratic_sum_cost,
77            self._force_edges_active_move,
78            self._rand.random_sample,
79            trace_func=search_trace,
80        )
81        return seqs
82
83    def _quadratic_sum_cost(self, state: _STATE) -> float:
84        """Cost function that sums squares of lengths of sequences.
85
86        Args:
87          state: Search state, not mutated.
88
89        Returns:
90          Cost which is minus the normalized quadratic sum of each linear
91          sequence section in the state. This promotes single, long linear
92          sequence solutions and converges to number -1. The solution with a
93          lowest cost consists of every node being a single sequence and is
94          always less than 0.
95        """
96        cost = 0.0
97        total_len = float(len(self._c))
98        seqs, _ = state
99        for seq in seqs:
100            cost += (len(seq) / total_len) ** 2
101        return -cost
102
103    def _force_edges_active_move(self, state: _STATE) -> _STATE:
104        """Move function which repeats _force_edge_active_move a few times.
105
106        Args:
107          state: Search state, not mutated.
108
109        Returns:
110          New search state which consists of incremental changes of the
111          original state.
112        """
113        for _ in range(self._rand.randint(1, 4)):
114            state = self._force_edge_active_move(state)
115        return state
116
117    def _force_edge_active_move(self, state: _STATE) -> _STATE:
118        """Move which forces a random edge to appear on some sequence.
119
120        This move chooses random edge from the edges which do not belong to any
121        sequence and modifies state in such a way, that this chosen edge
122        appears on some sequence of the search state.
123
124        Args:
125          state: Search state, not mutated.
126
127        Returns:
128          New search state with one of the unused edges appearing in some
129          sequence.
130        """
131        seqs, edges = state
132        unused_edges = edges.copy()
133
134        # List edges which do not belong to any linear sequence.
135        for seq in seqs:
136            for i in range(1, len(seq)):
137                unused_edges.remove(self._normalize_edge((seq[i - 1], seq[i])))
138
139        edge = self._choose_random_edge(unused_edges)
140        if not edge:
141            return seqs, edges
142
143        return (self._force_edge_active(seqs, edge, lambda: bool(self._rand.randint(2))), edges)
144
145    def _force_edge_active(
146        self, seqs: List[List[cirq.GridQubit]], edge: EDGE, sample_bool: Callable[[], bool]
147    ) -> List[List[cirq.GridQubit]]:
148        """Move which forces given edge to appear on some sequence.
149
150        Args:
151          seqs: List of linear sequences covering chip.
152          edge: Edge to be activated.
153          sample_bool: Callable returning random bool.
154
155        Returns:
156          New list of linear sequences with given edge on some of the
157          sequences.
158        """
159
160        n0, n1 = edge
161
162        # Make a copy of original sequences.
163        seqs = list(seqs)
164
165        # Localize edge nodes within current solution.
166        i0, j0 = index_2d(seqs, n0)
167        i1, j1 = index_2d(seqs, n1)
168        s0 = seqs[i0]
169        s1 = seqs[i1]
170
171        # Handle case when nodes belong to different linear sequences,
172        # separately from the case where they belong to a single linear
173        # sequence.
174        if i0 != i1:
175
176            # Split s0 and s1 in two parts: s0 in parts before n0, and after n0
177            # (without n0); s1 in parts before n1, and after n1 (without n1).
178            part = [s0[:j0], s0[j0 + 1 :]], [s1[:j1], s1[j1 + 1 :]]
179
180            # Remove both sequences from original list.
181            del seqs[max(i0, i1)]
182            del seqs[min(i0, i1)]
183
184            # Choose part of s0 which will be attached to n0, and make sure it
185            # can be attached in the end.
186            c0 = 0 if not part[0][1] else 1 if not part[0][0] else sample_bool()
187            if c0:
188                part[0][c0].reverse()
189
190            # Choose part of s1 which will be attached to n1, and make sure it
191            # can be attached in the beginning.
192            c1 = 0 if not part[1][1] else 1 if not part[1][0] else sample_bool()
193            if not c1:
194                part[1][c1].reverse()
195
196            # Append newly formed sequence from the chosen parts and new edge.
197            seqs.append(part[0][c0] + [n0, n1] + part[1][c1])
198
199            # Append the left-overs to the solution, if they exist.
200            other = [1, 0]
201            seqs.append(part[0][other[c0]])
202            seqs.append(part[1][other[c1]])
203        else:
204            # Swap nodes so that n0 always precedes n1 on sequence.
205            if j0 > j1:
206                j0, j1 = j1, j0
207                n0, n1 = n1, n0
208
209            # Split sequence in three parts, without nodes n0 an n1 present:
210            # head might end with n0, inner might begin with n0 and end with
211            # n1, tail might begin with n1.
212            head = s0[:j0]
213            inner = s0[j0 + 1 : j1]
214            tail = s0[j1 + 1 :]
215
216            # Remove original sequence from sequences list.
217            del seqs[i0]
218
219            # Either append edge to inner section, or attach it between head
220            # and tail.
221            if sample_bool():
222                # Append edge either before or after inner section.
223                if sample_bool():
224                    seqs.append(inner + [n1, n0] + head[::-1])
225                    seqs.append(tail)
226                else:
227                    seqs.append(tail[::-1] + [n1, n0] + inner)
228                    seqs.append(head)
229            else:
230                # Form a new sequence from head, tail, and new edge.
231                seqs.append(head + [n0, n1] + tail)
232                seqs.append(inner)
233
234        return [e for e in seqs if e]
235
236    def _create_initial_solution(self) -> _STATE:
237        """Creates initial solution based on the chip description.
238
239        Initial solution is constructed in a greedy way.
240
241        Returns:
242          Valid search state.
243        """
244
245        def extract_sequences() -> List[List[cirq.GridQubit]]:
246            """Creates list of sequences for initial state.
247
248            Returns:
249              List of lists of sequences constructed on the chip.
250            """
251            seqs = []
252            prev = None
253            seq = None
254            for node in self._c:
255                if prev is None:
256                    seq = [node]
257                else:
258                    if node in self._c_adj[prev]:
259                        # Expand current sequence.
260                        seq.append(node)
261                    else:
262                        # Create new sequence, there is no connection between
263                        # nodes.
264                        seqs.append(seq)
265                        seq = [node]
266                prev = node
267            if seq:
268                seqs.append(seq)
269            return seqs
270
271        def assemble_edges() -> Set[EDGE]:
272            """Creates list of edges for initial state.
273
274            Returns:
275              List of all possible edges.
276            """
277            nodes_set = set(self._c)
278            edges = set()
279            for n in self._c:
280                if above(n) in nodes_set:
281                    edges.add(self._normalize_edge((n, above(n))))
282                if right_of(n) in nodes_set:
283                    edges.add(self._normalize_edge((n, right_of(n))))
284            return edges
285
286        return extract_sequences(), assemble_edges()
287
288    def _normalize_edge(self, edge: EDGE) -> EDGE:
289        """Gives unique representative of the edge.
290
291        Two edges are equivalent if they form an edge between the same nodes.
292        This method returns representative of this edge which can be compared
293        using equality operator later.
294
295        Args:
296          edge: Edge to normalize.
297
298        Returns:
299          Normalized edge with lexicographically lower node on the first
300          position.
301        """
302
303        def lower(n: cirq.GridQubit, m: cirq.GridQubit) -> bool:
304            return n.row < m.row or (n.row == m.row and n.col < m.col)
305
306        n1, n2 = edge
307        return (n1, n2) if lower(n1, n2) else (n2, n1)
308
309    def _choose_random_edge(self, edges: Set[EDGE]) -> Optional[EDGE]:
310        """Picks random edge from the set of edges.
311
312        Args:
313          edges: Set of edges to pick from.
314
315        Returns:
316          Random edge from the supplied set, or None for empty set.
317        """
318        if edges:
319            index = self._rand.randint(len(edges))
320            for e in edges:
321                if not index:
322                    return e
323                index -= 1
324        return None
325
326
327class AnnealSequenceSearchStrategy(place_strategy.LinePlacementStrategy):
328    """Linearized sequence search using simulated annealing method.
329
330    TODO: This line search strategy is still work in progress and requires
331    efficiency improvements.
332    Github issue: https://github.com/quantumlib/Cirq/issues/2217
333    """
334
335    def __init__(
336        self,
337        trace_func: Callable[[List[LineSequence], float, float, float, bool], None] = None,
338        seed: int = None,
339    ) -> None:
340        """Linearized sequence search using simulated annealing method.
341
342        Args:
343            trace_func: Optional callable which will be called for each
344                        simulated annealing step with arguments: solution
345                        candidate (list of linear sequences on the chip),
346                        current temperature (float), candidate cost (float),
347                        probability of accepting candidate (float), and
348                        acceptance decision (boolean).
349            seed: Optional seed value for random number generator.
350
351        Returns:
352            List of linear sequences on the chip found by simulated annealing
353            method.
354        """
355        self.trace_func = trace_func
356        self.seed = seed
357
358    def place_line(self, device: 'cirq_google.XmonDevice', length: int) -> GridQubitLineTuple:
359        """Runs line sequence search.
360
361        Args:
362            device: Chip description.
363            length: Required line length.
364
365        Returns:
366            List of linear sequences on the chip found by simulated annealing
367            method.
368        """
369        seqs = AnnealSequenceSearch(device, self.seed).search(self.trace_func)
370        return GridQubitLineTuple.best_of(seqs, length)
371
372
373def index_2d(seqs: List[List[Any]], target: Any) -> Tuple[int, int]:
374    """Finds the first index of a target item within a list of lists.
375
376    Args:
377        seqs: The list of lists to search.
378        target: The item to find.
379
380    Raises:
381        ValueError: Item is not present.
382    """
383    for i in range(len(seqs)):
384        for j in range(len(seqs[i])):
385            if seqs[i][j] == target:
386                return i, j
387    raise ValueError('Item not present.')
388