1# Copyright 2020 The Cirq Developers 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import datetime 15 16from typing import Iterable, List, Optional, TYPE_CHECKING, Union 17from pytz import utc 18 19import cirq 20from cirq_google.engine.client.quantum import types as qtypes 21from cirq_google.engine.client.quantum import enums as qenums 22from cirq_google.api import v2 23from cirq_google.devices import serializable_device 24from cirq_google.engine import calibration 25from cirq_google.engine.engine_timeslot import EngineTimeSlot 26from cirq_google.serialization.serializable_gate_set import SerializableGateSet 27 28if TYPE_CHECKING: 29 import cirq_google.engine.engine as engine_base 30 31 32class EngineProcessor: 33 """A processor available via the Quantum Engine API. 34 35 Attributes: 36 project_id: A project_id of the parent Google Cloud Project. 37 processor_id: Unique ID of the processor. 38 """ 39 40 def __init__( 41 self, 42 project_id: str, 43 processor_id: str, 44 context: 'engine_base.EngineContext', 45 _processor: Optional[qtypes.QuantumProcessor] = None, 46 ) -> None: 47 """A processor available via the engine. 48 49 Args: 50 project_id: A project_id of the parent Google Cloud Project. 51 processor_id: Unique ID of the processor. 52 context: Engine configuration and context to use. 53 _processor: The optional current processor state. 54 """ 55 self.project_id = project_id 56 self.processor_id = processor_id 57 self.context = context 58 self._processor = _processor 59 60 def engine(self) -> 'engine_base.Engine': 61 """Returns the parent Engine object. 62 63 Returns: 64 The program's parent Engine. 65 """ 66 import cirq_google.engine.engine as engine_base 67 68 return engine_base.Engine(self.project_id, context=self.context) 69 70 def _inner_processor(self) -> qtypes.QuantumProcessor: 71 if not self._processor: 72 self._processor = self.context.client.get_processor(self.project_id, self.processor_id) 73 return self._processor 74 75 def health(self) -> str: 76 """Returns the current health of processor.""" 77 self._processor = self.context.client.get_processor(self.project_id, self.processor_id) 78 return qtypes.QuantumProcessor.Health.Name(self._processor.health) 79 80 def expected_down_time(self) -> 'Optional[datetime.datetime]': 81 """Returns the start of the next expected down time of the processor, if 82 set.""" 83 if self._inner_processor().HasField('expected_down_time'): 84 return self._inner_processor().expected_down_time.ToDatetime() 85 else: 86 return None 87 88 def expected_recovery_time(self) -> 'Optional[datetime.datetime]': 89 """Returns the expected the processor should be available, if set.""" 90 if self._inner_processor().HasField('expected_recovery_time'): 91 return self._inner_processor().expected_recovery_time.ToDatetime() 92 else: 93 return None 94 95 def supported_languages(self) -> List[str]: 96 """Returns the list of processor supported program languages.""" 97 return self._inner_processor().supported_languages 98 99 def get_device_specification(self) -> Optional[v2.device_pb2.DeviceSpecification]: 100 """Returns a device specification proto for use in determining 101 information about the device. 102 103 Returns: 104 Device specification proto if present. 105 """ 106 if self._inner_processor().HasField('device_spec'): 107 return v2.device_pb2.DeviceSpecification.FromString( 108 self._inner_processor().device_spec.value 109 ) 110 else: 111 return None 112 113 def get_device(self, gate_sets: Iterable[SerializableGateSet]) -> cirq.Device: 114 """Returns a `Device` created from the processor's device specification. 115 116 This method queries the processor to retrieve the device specification, 117 which is then use to create a `SerializableDevice` that will validate 118 that operations are supported and use the correct qubits. 119 """ 120 spec = self.get_device_specification() 121 if not spec: 122 raise ValueError('Processor does not have a device specification') 123 return serializable_device.SerializableDevice.from_proto(spec, gate_sets) 124 125 def list_calibrations( 126 self, 127 earliest_timestamp_seconds: Optional[int] = None, 128 latest_timestamp_seconds: Optional[int] = None, 129 ) -> List[calibration.Calibration]: 130 """Retrieve metadata about a specific calibration run. 131 132 Params: 133 earliest_timestamp_seconds: The earliest timestamp of a calibration 134 to return in UTC. 135 latest_timestamp_seconds: The latest timestamp of a calibration to 136 return in UTC. 137 138 Returns: 139 The list of calibration data with the most recent first. 140 """ 141 if earliest_timestamp_seconds and latest_timestamp_seconds: 142 filter_str = 'timestamp >= %d AND timestamp <= %d' % ( 143 earliest_timestamp_seconds, 144 latest_timestamp_seconds, 145 ) 146 elif earliest_timestamp_seconds: 147 filter_str = 'timestamp >= %d' % earliest_timestamp_seconds 148 elif latest_timestamp_seconds: 149 filter_str = 'timestamp <= %d' % latest_timestamp_seconds 150 else: 151 filter_str = '' 152 response = self.context.client.list_calibrations( 153 self.project_id, self.processor_id, filter_str 154 ) 155 return [_to_calibration(c.data) for c in list(response)] 156 157 def get_calibration(self, calibration_timestamp_seconds: int) -> calibration.Calibration: 158 """Retrieve metadata about a specific calibration run. 159 160 Params: 161 calibration_timestamp_seconds: The timestamp of the calibration in 162 seconds since epoch. 163 164 Returns: 165 The calibration data. 166 """ 167 response = self.context.client.get_calibration( 168 self.project_id, self.processor_id, calibration_timestamp_seconds 169 ) 170 return _to_calibration(response.data) 171 172 def get_current_calibration( 173 self, 174 ) -> Optional[calibration.Calibration]: 175 """Returns metadata about the current calibration for a processor. 176 177 Returns: 178 The calibration data or None if there is no current calibration. 179 """ 180 response = self.context.client.get_current_calibration(self.project_id, self.processor_id) 181 if response: 182 return _to_calibration(response.data) 183 else: 184 return None 185 186 def create_reservation( 187 self, 188 start_time: datetime.datetime, 189 end_time: datetime.datetime, 190 whitelisted_users: Optional[List[str]] = None, 191 ): 192 """Creates a reservation on this processor. 193 194 Args: 195 start_time: the starting date/time of the reservation. 196 end_time: the ending date/time of the reservation. 197 whitelisted_users: a list of emails that are allowed 198 to send programs during this reservation (in addition to users 199 with permission "quantum.reservations.use" on the project). 200 """ 201 response = self.context.client.create_reservation( 202 self.project_id, self.processor_id, start_time, end_time, whitelisted_users 203 ) 204 return response 205 206 def _delete_reservation(self, reservation_id: str): 207 """Delete a reservation. 208 209 This will only work for reservations outside the processor's 210 schedule freeze window. If you are not sure whether the reservation 211 falls within this window, use remove_reservation 212 """ 213 return self.context.client.delete_reservation( 214 self.project_id, self.processor_id, reservation_id 215 ) 216 217 def _cancel_reservation(self, reservation_id: str): 218 """Cancel a reservation. 219 220 This will only work for reservations inside the processor's 221 schedule freeze window. If you are not sure whether the reservation 222 falls within this window, use remove_reservation 223 """ 224 return self.context.client.cancel_reservation( 225 self.project_id, self.processor_id, reservation_id 226 ) 227 228 def remove_reservation(self, reservation_id: str): 229 reservation = self.get_reservation(reservation_id) 230 if reservation is None: 231 raise ValueError(f'Reservation id {reservation_id} not found.') 232 proc = self._inner_processor() 233 if proc: 234 freeze = proc.schedule_frozen_period.seconds 235 else: 236 freeze = None 237 if not freeze: 238 raise ValueError( 239 'Cannot determine freeze_schedule from processor.' 240 'Call _cancel_reservation or _delete_reservation.' 241 ) 242 secs_until = reservation.start_time.seconds - int(datetime.datetime.now(tz=utc).timestamp()) 243 if secs_until > freeze: 244 return self._delete_reservation(reservation_id) 245 else: 246 return self._cancel_reservation(reservation_id) 247 248 def get_reservation(self, reservation_id: str): 249 """Retrieve a reservation given its id.""" 250 return self.context.client.get_reservation( 251 self.project_id, self.processor_id, reservation_id 252 ) 253 254 def update_reservation( 255 self, 256 reservation_id: str, 257 start_time: datetime.datetime = None, 258 end_time: datetime.datetime = None, 259 whitelisted_users: List[str] = None, 260 ): 261 """Updates a reservation with new information. 262 263 Updates a reservation with a new start date, end date, or 264 list of additional users. For each field, it the argument is left as 265 None, it will not be updated. 266 """ 267 return self.context.client.update_reservation( 268 self.project_id, 269 self.processor_id, 270 reservation_id, 271 start=start_time, 272 end=end_time, 273 whitelisted_users=whitelisted_users, 274 ) 275 276 def list_reservations( 277 self, 278 from_time: Union[None, datetime.datetime, datetime.timedelta] = datetime.timedelta(), 279 to_time: Union[None, datetime.datetime, datetime.timedelta] = datetime.timedelta(weeks=2), 280 ) -> List[EngineTimeSlot]: 281 """Retrieves the reservations from a processor. 282 283 Only reservations from this processor and project will be 284 returned. The schedule may be filtered by starting and ending time. 285 286 Args: 287 from_time: Filters the returned reservations to only include entries 288 that end no earlier than the given value. Specified either as an 289 absolute time (datetime.datetime) or as a time relative to now 290 (datetime.timedelta). Defaults to now (a relative time of 0). 291 Set to None to omit this filter. 292 to_time: Filters the returned reservations to only include entries 293 that start no later than the given value. Specified either as an 294 absolute time (datetime.datetime) or as a time relative to now 295 (datetime.timedelta). Defaults to two weeks from now (a relative 296 time of two weeks). Set to None to omit this filter. 297 298 Returns: 299 A list of reservations. 300 """ 301 filters = _to_date_time_filters(from_time, to_time) 302 filter_str = ' AND '.join(filters) 303 return self.context.client.list_reservations(self.project_id, self.processor_id, filter_str) 304 305 def get_schedule( 306 self, 307 from_time: Union[None, datetime.datetime, datetime.timedelta] = datetime.timedelta(), 308 to_time: Union[None, datetime.datetime, datetime.timedelta] = datetime.timedelta(weeks=2), 309 time_slot_type: Optional[qenums.QuantumTimeSlot.TimeSlotType] = None, 310 ) -> List[EngineTimeSlot]: 311 """Retrieves the schedule for a processor. 312 313 The schedule may be filtered by time. 314 315 Time slot type will be supported in the future. 316 317 Args: 318 from_time: Filters the returned schedule to only include entries 319 that end no earlier than the given value. Specified either as an 320 absolute time (datetime.datetime) or as a time relative to now 321 (datetime.timedelta). Defaults to now (a relative time of 0). 322 Set to None to omit this filter. 323 to_time: Filters the returned schedule to only include entries 324 that start no later than the given value. Specified either as an 325 absolute time (datetime.datetime) or as a time relative to now 326 (datetime.timedelta). Defaults to two weeks from now (a relative 327 time of two weeks). Set to None to omit this filter. 328 time_slot_type: Filters the returned schedule to only include 329 entries with a given type (e.g. maintenance, open swim). 330 Defaults to None. Set to None to omit this filter. 331 332 Returns: 333 Schedule time slots. 334 """ 335 filters = _to_date_time_filters(from_time, to_time) 336 if time_slot_type is not None: 337 filters.append(f'time_slot_type = {time_slot_type.name}') 338 filter_str = ' AND '.join(filters) 339 return self.context.client.list_time_slots(self.project_id, self.processor_id, filter_str) 340 341 def __str__(self): 342 return ( 343 f"EngineProcessor(project_id={self.project_id!r}, " 344 f"processor_id={self.processor_id!r})" 345 ) 346 347 348def _to_calibration(calibration_any: qtypes.any_pb2.Any) -> calibration.Calibration: 349 metrics = v2.metrics_pb2.MetricsSnapshot.FromString(calibration_any.value) 350 return calibration.Calibration(metrics) 351 352 353def _to_date_time_filters( 354 from_time: Union[None, datetime.datetime, datetime.timedelta], 355 to_time: Union[None, datetime.datetime, datetime.timedelta], 356) -> List[str]: 357 now = datetime.datetime.now() 358 359 if from_time is None: 360 start_time = None 361 elif isinstance(from_time, datetime.timedelta): 362 start_time = now + from_time 363 elif isinstance(from_time, datetime.datetime): 364 start_time = from_time 365 else: 366 raise ValueError(f"Don't understand from_time of type {type(from_time)}.") 367 368 if to_time is None: 369 end_time = None 370 elif isinstance(to_time, datetime.timedelta): 371 end_time = now + to_time 372 elif isinstance(to_time, datetime.datetime): 373 end_time = to_time 374 else: 375 raise ValueError(f"Don't understand to_time of type {type(to_time)}.") 376 377 filters = [] 378 if end_time is not None: 379 filters.append(f'start_time < {int(end_time.timestamp())}') 380 if start_time is not None: 381 filters.append(f'end_time > {int(start_time.timestamp())}') 382 return filters 383