1# Copyright 2020 The Cirq Developers 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import datetime 16import sys 17import time 18from typing import Callable, Dict, List, Optional, Sequence, Set, TypeVar, Tuple, Union 19import warnings 20 21from google.api_core.exceptions import GoogleAPICallError, NotFound 22from google.protobuf.timestamp_pb2 import Timestamp 23 24from cirq_google.engine.client import quantum 25from cirq_google.engine.client.quantum import types as qtypes 26 27_R = TypeVar('_R') 28 29 30class EngineException(Exception): 31 def __init__(self, message): 32 # Call the base class constructor with the parameters it needs 33 super().__init__(message) 34 35 36RETRYABLE_ERROR_CODES = [500, 503] 37 38 39class EngineClient: 40 """Client for the Quantum Engine API that deals with the engine protos and 41 the gRPC client but not cirq protos or objects. All users are likely better 42 served by using the Engine, EngineProgram, EngineJob, EngineProcessor, and 43 Calibration objects instead of using this directly. 44 """ 45 46 def __init__( 47 self, 48 service_args: Optional[Dict] = None, 49 verbose: Optional[bool] = None, 50 max_retry_delay_seconds: int = 3600, # 1 hour 51 ) -> None: 52 """Engine service client. 53 54 Args: 55 service_args: A dictionary of arguments that can be used to 56 configure options on the underlying gRPC client. 57 verbose: Suppresses stderr messages when set to False. Default is 58 true. 59 max_retry_delay_seconds: The maximum number of seconds to retry when 60 a retryable error code is returned. 61 """ 62 self.max_retry_delay_seconds = max_retry_delay_seconds 63 if verbose is None: 64 verbose = True 65 self.verbose = verbose 66 67 if not service_args: 68 service_args = {} 69 70 # Suppress warnings about using Application Default Credentials. 71 with warnings.catch_warnings(): 72 warnings.simplefilter('ignore') 73 self.grpc_client = quantum.QuantumEngineServiceClient(**service_args) 74 75 def _make_request(self, request: Callable[[], _R]) -> _R: 76 # Start with a 100ms retry delay with exponential backoff to 77 # max_retry_delay_seconds 78 current_delay = 0.1 79 80 while True: 81 try: 82 return request() 83 except GoogleAPICallError as err: 84 message = err.message 85 # Raise RuntimeError for exceptions that are not retryable. 86 # Otherwise, pass through to retry. 87 if err.code.value not in RETRYABLE_ERROR_CODES: 88 raise EngineException(message) from err 89 90 if current_delay > self.max_retry_delay_seconds: 91 raise TimeoutError(f'Reached max retry attempts for error: {message}') 92 if self.verbose: 93 print(message, file=sys.stderr) 94 print('Waiting ', current_delay, 'seconds before retrying.', file=sys.stderr) 95 time.sleep(current_delay) 96 current_delay *= 2 97 98 def create_program( 99 self, 100 project_id: str, 101 program_id: Optional[str], 102 code: qtypes.any_pb2.Any, 103 description: Optional[str] = None, 104 labels: Optional[Dict[str, str]] = None, 105 ) -> Tuple[str, qtypes.QuantumProgram]: 106 """Creates a Quantum Engine program. 107 108 Args: 109 project_id: A project_id of the parent Google Cloud Project. 110 program_id: Unique ID of the program within the parent project. 111 code: Properly serialized program code. 112 description: An optional description to set on the program. 113 labels: Optional set of labels to set on the program. 114 115 Returns: 116 Tuple of created program id and program 117 """ 118 119 parent_name = _project_name(project_id) 120 program_name = _program_name_from_ids(project_id, program_id) if program_id else '' 121 request = qtypes.QuantumProgram(name=program_name, code=code) 122 if description: 123 request.description = description 124 if labels: 125 request.labels.update(labels) 126 127 program = self._make_request( 128 lambda: self.grpc_client.create_quantum_program(parent_name, request, False) 129 ) 130 return _ids_from_program_name(program.name)[1], program 131 132 def get_program( 133 self, project_id: str, program_id: str, return_code: bool 134 ) -> qtypes.QuantumProgram: 135 """Returns a previously created quantum program. 136 137 Args: 138 project_id: A project_id of the parent Google Cloud Project. 139 program_id: Unique ID of the program within the parent project. 140 return_code: If True returns the serialized program code. 141 """ 142 return self._make_request( 143 lambda: self.grpc_client.get_quantum_program( 144 _program_name_from_ids(project_id, program_id), return_code 145 ) 146 ) 147 148 def list_programs( 149 self, 150 project_id: str, 151 created_before: Optional[Union[datetime.datetime, datetime.date]] = None, 152 created_after: Optional[Union[datetime.datetime, datetime.date]] = None, 153 has_labels: Optional[Dict[str, str]] = None, 154 ): 155 """Returns a list of previously executed quantum programs. 156 157 Args: 158 project_id: the id of the project 159 created_after: retrieve programs that were created after this date 160 or time. 161 created_before: retrieve programs that were created after this date 162 or time. 163 has_labels: retrieve programs that have labels on them specified by 164 this dict. If the value is set to `*`, filters having the label 165 egardless of the label value will be filtered. For example, to 166 uery programs that have the shape label and have the color 167 label with value red can be queried using 168 169 {'color': 'red', 'shape':'*'} 170 """ 171 filters = [] 172 173 if created_after is not None: 174 val = _date_or_time_to_filter_expr('created_after', created_after) 175 filters.append(f"create_time >= {val}") 176 if created_before is not None: 177 val = _date_or_time_to_filter_expr('created_before', created_before) 178 filters.append(f"create_time <= {val}") 179 if has_labels is not None: 180 for (k, v) in has_labels.items(): 181 filters.append(f"labels.{k}:{v}") 182 return self._make_request( 183 lambda: self.grpc_client.list_quantum_programs( 184 _project_name(project_id), filter_=" AND ".join(filters) 185 ) 186 ) 187 188 def set_program_description( 189 self, project_id: str, program_id: str, description: str 190 ) -> qtypes.QuantumProgram: 191 """Sets the description for a previously created quantum program. 192 193 Args: 194 project_id: A project_id of the parent Google Cloud Project. 195 program_id: Unique ID of the program within the parent project. 196 description: The new program description. 197 198 Returns: 199 The updated quantum program. 200 """ 201 program_resource_name = _program_name_from_ids(project_id, program_id) 202 return self._make_request( 203 lambda: self.grpc_client.update_quantum_program( 204 program_resource_name, 205 qtypes.QuantumProgram(name=program_resource_name, description=description), 206 qtypes.field_mask_pb2.FieldMask(paths=['description']), 207 ) 208 ) 209 210 def _set_program_labels( 211 self, project_id: str, program_id: str, labels: Dict[str, str], fingerprint: str 212 ) -> qtypes.QuantumProgram: 213 program_resource_name = _program_name_from_ids(project_id, program_id) 214 return self._make_request( 215 lambda: self.grpc_client.update_quantum_program( 216 program_resource_name, 217 qtypes.QuantumProgram( 218 name=program_resource_name, labels=labels, label_fingerprint=fingerprint 219 ), 220 qtypes.field_mask_pb2.FieldMask(paths=['labels']), 221 ) 222 ) 223 224 def set_program_labels( 225 self, project_id: str, program_id: str, labels: Dict[str, str] 226 ) -> qtypes.QuantumProgram: 227 """Sets (overwriting) the labels for a previously created quantum 228 program. 229 230 Args: 231 project_id: A project_id of the parent Google Cloud Project. 232 program_id: Unique ID of the program within the parent project. 233 labels: The entire set of new program labels. 234 235 Returns: 236 The updated quantum program. 237 """ 238 program = self.get_program(project_id, program_id, False) 239 return self._set_program_labels(project_id, program_id, labels, program.label_fingerprint) 240 241 def add_program_labels( 242 self, project_id: str, program_id: str, labels: Dict[str, str] 243 ) -> qtypes.QuantumProgram: 244 """Adds new labels to a previously created quantum program. 245 246 Args: 247 project_id: A project_id of the parent Google Cloud Project. 248 program_id: Unique ID of the program within the parent project. 249 labels: New labels to add to the existing program labels. 250 251 Returns: 252 The updated quantum program. 253 """ 254 program = self.get_program(project_id, program_id, False) 255 old_labels = program.labels 256 new_labels = dict(old_labels) 257 new_labels.update(labels) 258 if new_labels != old_labels: 259 fingerprint = program.label_fingerprint 260 return self._set_program_labels(project_id, program_id, new_labels, fingerprint) 261 return program 262 263 def remove_program_labels( 264 self, project_id: str, program_id: str, label_keys: List[str] 265 ) -> qtypes.QuantumProgram: 266 """Removes labels with given keys from the labels of a previously 267 created quantum program. 268 269 Args: 270 project_id: A project_id of the parent Google Cloud Project. 271 program_id: Unique ID of the program within the parent project. 272 label_keys: Label keys to remove from the existing program labels. 273 274 Returns: 275 The updated quantum program. 276 """ 277 program = self.get_program(project_id, program_id, False) 278 old_labels = program.labels 279 new_labels = dict(old_labels) 280 for key in label_keys: 281 new_labels.pop(key, None) 282 if new_labels != old_labels: 283 fingerprint = program.label_fingerprint 284 return self._set_program_labels(project_id, program_id, new_labels, fingerprint) 285 return program 286 287 def delete_program(self, project_id: str, program_id: str, delete_jobs: bool = False) -> None: 288 """Deletes a previously created quantum program. 289 290 Args: 291 project_id: A project_id of the parent Google Cloud Project. 292 program_id: Unique ID of the program within the parent project. 293 delete_jobs: If True will delete all the program's jobs, other this 294 will fail if the program contains any jobs. 295 """ 296 self._make_request( 297 lambda: self.grpc_client.delete_quantum_program( 298 _program_name_from_ids(project_id, program_id), delete_jobs 299 ) 300 ) 301 302 # TODO(#3388) Add documentation for Raises. 303 # pylint: disable=missing-raises-doc 304 def create_job( 305 self, 306 project_id: str, 307 program_id: str, 308 job_id: Optional[str], 309 processor_ids: Sequence[str], 310 run_context: qtypes.any_pb2.Any, 311 priority: Optional[int] = None, 312 description: Optional[str] = None, 313 labels: Optional[Dict[str, str]] = None, 314 ) -> Tuple[str, qtypes.QuantumJob]: 315 """Creates and runs a job on Quantum Engine. 316 317 Args: 318 project_id: A project_id of the parent Google Cloud Project. 319 program_id: Unique ID of the program within the parent project. 320 job_id: Unique ID of the job within the parent program. 321 run_context: Properly serialized run context. 322 processor_ids: List of processor id for running the program. 323 priority: Optional priority to run at, 0-1000. 324 description: Optional description to set on the job. 325 labels: Optional set of labels to set on the job. 326 327 Returns: 328 Tuple of created job id and job 329 """ 330 # Check program to run and program parameters. 331 if priority and not 0 <= priority < 1000: 332 raise ValueError('priority must be between 0 and 1000') 333 334 # Create job. 335 job_name = _job_name_from_ids(project_id, program_id, job_id) if job_id else '' 336 request = qtypes.QuantumJob( 337 name=job_name, 338 scheduling_config=qtypes.SchedulingConfig( 339 processor_selector=qtypes.SchedulingConfig.ProcessorSelector( 340 processor_names=[ 341 _processor_name_from_ids(project_id, processor_id) 342 for processor_id in processor_ids 343 ] 344 ) 345 ), 346 run_context=run_context, 347 ) 348 if priority: 349 request.scheduling_config.priority = priority 350 if description: 351 request.description = description 352 if labels: 353 request.labels.update(labels) 354 job = self._make_request( 355 lambda: self.grpc_client.create_quantum_job( 356 _program_name_from_ids(project_id, program_id), request, False 357 ) 358 ) 359 return _ids_from_job_name(job.name)[2], job 360 361 # pylint: enable=missing-raises-doc 362 def list_jobs( 363 self, 364 project_id: str, 365 program_id: Optional[str] = None, 366 created_before: Optional[Union[datetime.datetime, datetime.date]] = None, 367 created_after: Optional[Union[datetime.datetime, datetime.date]] = None, 368 has_labels: Optional[Dict[str, str]] = None, 369 execution_states: Optional[Set[quantum.enums.ExecutionStatus.State]] = None, 370 ): 371 """Returns the list of jobs for a given program. 372 373 Args: 374 project_id: A project_id of the parent Google Cloud Project. 375 program_id: Optional, a unique ID of the program within the parent 376 project. If None, jobs will be listed across all programs within 377 the project. 378 created_after: retrieve jobs that were created after this date 379 or time. 380 created_before: retrieve jobs that were created after this date 381 or time. 382 has_labels: retrieve jobs that have labels on them specified by 383 this dict. If the value is set to `*`, filters having the label 384 regardless of the label value will be filtered. For example, to 385 query programs that have the shape label and have the color 386 label with value red can be queried using 387 388 {'color': 'red', 'shape':'*'} 389 390 execution_states: retrieve jobs that have an execution state that 391 is contained in `execution_states`. See 392 `quantum.enums.ExecutionStatus.State` enum for accepted values. 393 """ 394 filters = [] 395 396 if created_after is not None: 397 val = _date_or_time_to_filter_expr('created_after', created_after) 398 filters.append(f"create_time >= {val}") 399 if created_before is not None: 400 val = _date_or_time_to_filter_expr('created_before', created_before) 401 filters.append(f"create_time <= {val}") 402 if has_labels is not None: 403 for (k, v) in has_labels.items(): 404 filters.append(f"labels.{k}:{v}") 405 if execution_states is not None: 406 state_filter = [] 407 for execution_state in execution_states: 408 state_filter.append(f"execution_status.state = {execution_state.name}") 409 filters.append(f"({' OR '.join(state_filter)})") 410 411 if program_id is None: 412 program_id = "-" 413 parent = _program_name_from_ids(project_id, program_id) 414 return self._make_request( 415 lambda: self.grpc_client.list_quantum_jobs(parent, filter_=" AND ".join(filters)) 416 ) 417 418 def get_job( 419 self, project_id: str, program_id: str, job_id: str, return_run_context: bool 420 ) -> qtypes.QuantumJob: 421 """Returns a previously created job. 422 423 Args: 424 project_id: A project_id of the parent Google Cloud Project. 425 program_id: Unique ID of the program within the parent project. 426 job_id: Unique ID of the job within the parent program. 427 return_run_context: If true then the run context will be loaded 428 from the job's run_context_location and set on the returned 429 QuantumJob. 430 """ 431 return self._make_request( 432 lambda: self.grpc_client.get_quantum_job( 433 _job_name_from_ids(project_id, program_id, job_id), return_run_context 434 ) 435 ) 436 437 def set_job_description( 438 self, project_id: str, program_id: str, job_id: str, description: str 439 ) -> qtypes.QuantumJob: 440 """Sets the description for a previously created quantum job. 441 442 Args: 443 project_id: A project_id of the parent Google Cloud Project. 444 program_id: Unique ID of the program within the parent project. 445 job_id: Unique ID of the job within the parent program. 446 description: The new job description. 447 448 Returns: 449 The updated quantum job. 450 """ 451 job_resource_name = _job_name_from_ids(project_id, program_id, job_id) 452 return self._make_request( 453 lambda: self.grpc_client.update_quantum_job( 454 job_resource_name, 455 qtypes.QuantumJob(name=job_resource_name, description=description), 456 qtypes.field_mask_pb2.FieldMask(paths=['description']), 457 ) 458 ) 459 460 def _set_job_labels( 461 self, 462 project_id: str, 463 program_id: str, 464 job_id: str, 465 labels: Dict[str, str], 466 fingerprint: str, 467 ) -> qtypes.QuantumJob: 468 job_resource_name = _job_name_from_ids(project_id, program_id, job_id) 469 return self._make_request( 470 lambda: self.grpc_client.update_quantum_job( 471 job_resource_name, 472 qtypes.QuantumJob( 473 name=job_resource_name, labels=labels, label_fingerprint=fingerprint 474 ), 475 qtypes.field_mask_pb2.FieldMask(paths=['labels']), 476 ) 477 ) 478 479 def set_job_labels( 480 self, project_id: str, program_id: str, job_id: str, labels: Dict[str, str] 481 ) -> qtypes.QuantumJob: 482 """Sets (overwriting) the labels for a previously created quantum job. 483 484 Args: 485 project_id: A project_id of the parent Google Cloud Project. 486 program_id: Unique ID of the program within the parent project. 487 job_id: Unique ID of the job within the parent program. 488 labels: The entire set of new job labels. 489 490 Returns: 491 The updated quantum job. 492 """ 493 job = self.get_job(project_id, program_id, job_id, False) 494 return self._set_job_labels(project_id, program_id, job_id, labels, job.label_fingerprint) 495 496 def add_job_labels( 497 self, project_id: str, program_id: str, job_id: str, labels: Dict[str, str] 498 ) -> qtypes.QuantumJob: 499 """Adds new labels to a previously created quantum job. 500 501 Args: 502 project_id: A project_id of the parent Google Cloud Project. 503 program_id: Unique ID of the program within the parent project. 504 job_id: Unique ID of the job within the parent program. 505 labels: New labels to add to the existing job labels. 506 507 Returns: 508 The updated quantum job. 509 """ 510 job = self.get_job(project_id, program_id, job_id, False) 511 old_labels = job.labels 512 new_labels = dict(old_labels) 513 new_labels.update(labels) 514 if new_labels != old_labels: 515 fingerprint = job.label_fingerprint 516 return self._set_job_labels(project_id, program_id, job_id, new_labels, fingerprint) 517 return job 518 519 def remove_job_labels( 520 self, project_id: str, program_id: str, job_id: str, label_keys: List[str] 521 ) -> qtypes.QuantumJob: 522 """Removes labels with given keys from the labels of a previously 523 created quantum job. 524 525 Args: 526 project_id: A project_id of the parent Google Cloud Project. 527 program_id: Unique ID of the program within the parent project. 528 job_id: Unique ID of the job within the parent program. 529 label_keys: Label keys to remove from the existing job labels. 530 531 Returns: 532 The updated quantum job. 533 """ 534 job = self.get_job(project_id, program_id, job_id, False) 535 old_labels = job.labels 536 new_labels = dict(old_labels) 537 for key in label_keys: 538 new_labels.pop(key, None) 539 if new_labels != old_labels: 540 fingerprint = job.label_fingerprint 541 return self._set_job_labels(project_id, program_id, job_id, new_labels, fingerprint) 542 return job 543 544 def delete_job(self, project_id: str, program_id: str, job_id: str) -> None: 545 """Deletes a previously created quantum job. 546 547 Args: 548 project_id: A project_id of the parent Google Cloud Project. 549 program_id: Unique ID of the program within the parent project. 550 job_id: Unique ID of the job within the parent program. 551 """ 552 self._make_request( 553 lambda: self.grpc_client.delete_quantum_job( 554 _job_name_from_ids(project_id, program_id, job_id) 555 ) 556 ) 557 558 def cancel_job(self, project_id: str, program_id: str, job_id: str) -> None: 559 """Cancels the given job. 560 561 Args: 562 project_id: A project_id of the parent Google Cloud Project. 563 program_id: Unique ID of the program within the parent project. 564 job_id: Unique ID of the job within the parent program. 565 """ 566 self._make_request( 567 lambda: self.grpc_client.cancel_quantum_job( 568 _job_name_from_ids(project_id, program_id, job_id) 569 ) 570 ) 571 572 def get_job_results( 573 self, project_id: str, program_id: str, job_id: str 574 ) -> qtypes.QuantumResult: 575 """Returns the results of a completed job. 576 577 Args: 578 project_id: A project_id of the parent Google Cloud Project. 579 program_id: Unique ID of the program within the parent project. 580 job_id: Unique ID of the job within the parent program. 581 582 Returns: 583 The quantum result. 584 """ 585 return self._make_request( 586 lambda: self.grpc_client.get_quantum_result( 587 _job_name_from_ids(project_id, program_id, job_id) 588 ) 589 ) 590 591 def list_processors(self, project_id: str) -> List[qtypes.QuantumProcessor]: 592 """Returns a list of Processors that the user has visibility to in the 593 current Engine project. The names of these processors are used to 594 identify devices when scheduling jobs and gathering calibration metrics. 595 596 Args: 597 project_id: A project_id of the parent Google Cloud Project. 598 599 Returns: 600 A list of metadata of each processor. 601 """ 602 response = self._make_request( 603 lambda: self.grpc_client.list_quantum_processors(_project_name(project_id), filter_='') 604 ) 605 return list(response) 606 607 def get_processor(self, project_id: str, processor_id: str) -> qtypes.QuantumProcessor: 608 """Returns a quantum processor. 609 610 Args: 611 project_id: A project_id of the parent Google Cloud Project. 612 processor_id: The processor unique identifier. 613 614 Returns: 615 The quantum processor. 616 """ 617 return self._make_request( 618 lambda: self.grpc_client.get_quantum_processor( 619 _processor_name_from_ids(project_id, processor_id) 620 ) 621 ) 622 623 def list_calibrations( 624 self, project_id: str, processor_id: str, filter_str: str = '' 625 ) -> List[qtypes.QuantumCalibration]: 626 """Returns a list of quantum calibrations. 627 628 Args: 629 project_id: A project_id of the parent Google Cloud Project. 630 processor_id: The processor unique identifier. 631 filter_str: Filter string current only supports 'timestamp' with values 632 of epoch time in seconds or short string 'yyyy-MM-dd'. For example: 633 'timestamp > 1577960125 AND timestamp <= 1578241810' 634 'timestamp > 2020-01-02 AND timestamp <= 2020-01-05' 635 636 Returns: 637 A list of calibrations. 638 """ 639 response = self._make_request( 640 lambda: self.grpc_client.list_quantum_calibrations( 641 _processor_name_from_ids(project_id, processor_id), filter_=filter_str 642 ) 643 ) 644 return list(response) 645 646 def get_calibration( 647 self, project_id: str, processor_id: str, calibration_timestamp_seconds: int 648 ) -> qtypes.QuantumCalibration: 649 """Returns a quantum calibration. 650 651 Args: 652 project_id: A project_id of the parent Google Cloud Project. 653 processor_id: The processor unique identifier. 654 calibration_timestamp_seconds: The timestamp of the calibration in 655 seconds. 656 657 Returns: 658 The quantum calibration. 659 """ 660 return self._make_request( 661 lambda: self.grpc_client.get_quantum_calibration( 662 _calibration_name_from_ids(project_id, processor_id, calibration_timestamp_seconds) 663 ) 664 ) 665 666 # TODO(#3388) Add documentation for Raises. 667 # pylint: disable=missing-raises-doc 668 def get_current_calibration( 669 self, project_id: str, processor_id: str 670 ) -> Optional[qtypes.QuantumCalibration]: 671 """Returns the current quantum calibration for a processor if it has one. 672 673 Args: 674 project_id: A project_id of the parent Google Cloud Project. 675 processor_id: The processor unique identifier. 676 677 Returns: 678 The quantum calibration or None if there is no current calibration. 679 """ 680 try: 681 return self._make_request( 682 lambda: self.grpc_client.get_quantum_calibration( 683 _processor_name_from_ids(project_id, processor_id) + '/calibrations/current' 684 ) 685 ) 686 except EngineException as err: 687 if isinstance(err.__cause__, NotFound): 688 return None 689 raise 690 691 # pylint: enable=missing-raises-doc 692 def create_reservation( 693 self, 694 project_id: str, 695 processor_id: str, 696 start: datetime.datetime, 697 end: datetime.datetime, 698 whitelisted_users: Optional[List[str]] = None, 699 ): 700 """Creates a quantum reservation and returns the created object. 701 702 Args: 703 project_id: A project_id of the parent Google Cloud Project. 704 processor_id: The processor unique identifier. 705 reservation_id: Unique ID of the reservation in the parent project, 706 or None if the engine should generate an id 707 start: the starting time of the reservation as a datetime object 708 end: the ending time of the reservation as a datetime object 709 whitelisted_users: a list of emails that can use the reservation. 710 """ 711 parent = _processor_name_from_ids(project_id, processor_id) 712 reservation = qtypes.QuantumReservation( 713 name='', 714 start_time=Timestamp(seconds=int(start.timestamp())), 715 end_time=Timestamp(seconds=int(end.timestamp())), 716 ) 717 if whitelisted_users: 718 reservation.whitelisted_users.extend(whitelisted_users) 719 return self._make_request( 720 lambda: self.grpc_client.create_quantum_reservation( 721 parent=parent, quantum_reservation=reservation 722 ) 723 ) 724 725 def cancel_reservation(self, project_id: str, processor_id: str, reservation_id: str): 726 """Cancels a quantum reservation. 727 728 This action is only valid if the associated [QuantumProcessor] 729 schedule not been frozen. Otherwise, delete_reservation should 730 be used. 731 732 The reservation will be truncated to end at the time when the request is 733 serviced and any remaining time will be made available as an open swim 734 period. This action will only succeed if the reservation has not yet 735 ended and is within the processor's freeze window. If the reservation 736 has already ended or is beyond the processor's freeze window, then the 737 call will return an error. 738 739 Args: 740 project_id: A project_id of the parent Google Cloud Project. 741 processor_id: The processor unique identifier. 742 reservation_id: Unique ID of the reservation in the parent project, 743 """ 744 name = _reservation_name_from_ids(project_id, processor_id, reservation_id) 745 return self._make_request(lambda: self.grpc_client.cancel_quantum_reservation(name=name)) 746 747 def delete_reservation(self, project_id: str, processor_id: str, reservation_id: str): 748 """Deletes a quantum reservation. 749 750 This action is only valid if the associated [QuantumProcessor] 751 schedule has not been frozen. Otherwise, cancel_reservation 752 should be used. 753 754 If the reservation has already ended or is within the processor's 755 freeze window, then the call will return a `FAILED_PRECONDITION` error. 756 757 Args: 758 project_id: A project_id of the parent Google Cloud Project. 759 processor_id: The processor unique identifier. 760 reservation_id: Unique ID of the reservation in the parent project, 761 """ 762 name = _reservation_name_from_ids(project_id, processor_id, reservation_id) 763 return self._make_request(lambda: self.grpc_client.delete_quantum_reservation(name=name)) 764 765 # TODO(#3388) Add documentation for Raises. 766 # pylint: disable=missing-raises-doc 767 def get_reservation(self, project_id: str, processor_id: str, reservation_id: str): 768 """Gets a quantum reservation from the engine. 769 770 Args: 771 project_id: A project_id of the parent Google Cloud Project. 772 processor_id: The processor unique identifier. 773 reservation_id: Unique ID of the reservation in the parent project, 774 """ 775 try: 776 name = _reservation_name_from_ids(project_id, processor_id, reservation_id) 777 return self._make_request(lambda: self.grpc_client.get_quantum_reservation(name=name)) 778 except EngineException as err: 779 if isinstance(err.__cause__, NotFound): 780 return None 781 raise 782 783 # pylint: enable=missing-raises-doc 784 def list_reservations( 785 self, project_id: str, processor_id: str, filter_str: str = '' 786 ) -> List[qtypes.QuantumReservation]: 787 """Returns a list of quantum reservations. 788 789 Only reservations owned by this project will be returned. 790 791 Args: 792 project_id: A project_id of the parent Google Cloud Project. 793 processor_id: The processor unique identifier. 794 filter_str: A string for filtering quantum reservations. 795 The fields eligible for filtering are start_time and end_time 796 Examples: 797 `start_time >= 1584385200`: Reservation began on or after 798 the epoch time Mar 16th, 7pm GMT. 799 `end_time >= 1483370475`: Reservation ends on 800 or after Jan 2nd 2017 15:21:15 801 802 Returns: 803 A list of QuantumReservation objects. 804 """ 805 response = self._make_request( 806 lambda: self.grpc_client.list_quantum_reservations( 807 _processor_name_from_ids(project_id, processor_id), filter_=filter_str 808 ) 809 ) 810 811 return list(response) 812 813 def update_reservation( 814 self, 815 project_id: str, 816 processor_id: str, 817 reservation_id: str, 818 start: Optional[datetime.datetime] = None, 819 end: Optional[datetime.datetime] = None, 820 whitelisted_users: Optional[List[str]] = None, 821 ): 822 """Updates a quantum reservation. 823 824 This will update a quantum reservation's starting time, ending time, 825 and list of whitelisted users. If any field is not filled, it will 826 not be updated. 827 828 Args: 829 project_id: A project_id of the parent Google Cloud Project. 830 processor_id: The processor unique identifier. 831 reservation_id: Unique ID of the reservation in the parent project, 832 start: the new starting time of the reservation as a datetime object 833 end: the new ending time of the reservation as a datetime object 834 whitelisted_users: a list of emails that can use the reservation. 835 The empty list, [], will clear the whitelisted_users while None 836 will leave the value unchanged. 837 """ 838 name = ( 839 _reservation_name_from_ids(project_id, processor_id, reservation_id) 840 if reservation_id 841 else '' 842 ) 843 844 reservation = qtypes.QuantumReservation( 845 name=name, 846 ) 847 paths = [] 848 if start: 849 reservation.start_time.seconds = int(start.timestamp()) 850 paths.append('start_time') 851 if end: 852 reservation.end_time.seconds = int(end.timestamp()) 853 paths.append('end_time') 854 if whitelisted_users != None: 855 reservation.whitelisted_users.extend(whitelisted_users) 856 paths.append('whitelisted_users') 857 858 return self._make_request( 859 lambda: self.grpc_client.update_quantum_reservation( 860 name=name, 861 quantum_reservation=reservation, 862 update_mask=qtypes.field_mask_pb2.FieldMask(paths=paths), 863 ) 864 ) 865 866 def list_time_slots( 867 self, project_id: str, processor_id: str, filter_str: str = '' 868 ) -> List[qtypes.QuantumTimeSlot]: 869 """Returns a list of quantum time slots on a processor. 870 871 Args: 872 project_id: A project_id of the parent Google Cloud Project. 873 processor_id: The processor unique identifier. 874 filter_str: A string expression for filtering the quantum 875 time slots returned by the list command. The fields 876 eligible for filtering are `start_time`, `end_time`. 877 878 Returns: 879 A list of QuantumTimeSlot objects. 880 """ 881 response = self._make_request( 882 lambda: self.grpc_client.list_quantum_time_slots( 883 _processor_name_from_ids(project_id, processor_id), filter_=filter_str 884 ) 885 ) 886 return list(response) 887 888 889def _project_name(project_id: str) -> str: 890 return f'projects/{project_id}' 891 892 893def _program_name_from_ids(project_id: str, program_id: str) -> str: 894 return f'projects/{project_id}/programs/{program_id}' 895 896 897def _job_name_from_ids(project_id: str, program_id: str, job_id: str) -> str: 898 return f'projects/{project_id}/programs/{program_id}/jobs/{job_id}' 899 900 901def _processor_name_from_ids(project_id: str, processor_id: str) -> str: 902 return f'projects/{project_id}/processors/{processor_id}' 903 904 905def _calibration_name_from_ids( 906 project_id: str, processor_id: str, calibration_time_seconds: int 907) -> str: 908 return 'projects/%s/processors/%s/calibrations/%d' % ( 909 project_id, 910 processor_id, 911 calibration_time_seconds, 912 ) 913 914 915def _reservation_name_from_ids(project_id: str, processor_id: str, reservation_id: str) -> str: 916 return 'projects/%s/processors/%s/reservations/%s' % ( 917 project_id, 918 processor_id, 919 reservation_id, 920 ) 921 922 923def _ids_from_program_name(program_name: str) -> Tuple[str, str]: 924 parts = program_name.split('/') 925 return parts[1], parts[3] 926 927 928def _ids_from_job_name(job_name: str) -> Tuple[str, str, str]: 929 parts = job_name.split('/') 930 return parts[1], parts[3], parts[5] 931 932 933def _ids_from_processor_name(processor_name: str) -> Tuple[str, str]: 934 parts = processor_name.split('/') 935 return parts[1], parts[3] 936 937 938def _ids_from_calibration_name(calibration_name: str) -> Tuple[str, str, int]: 939 parts = calibration_name.split('/') 940 return parts[1], parts[3], int(parts[5]) 941 942 943# TODO(#3388) Add documentation for Raises. 944# pylint: disable=missing-raises-doc 945def _date_or_time_to_filter_expr(param_name: str, param: Union[datetime.datetime, datetime.date]): 946 """Formats datetime or date to filter expressions. 947 948 Args: 949 param_name: the name of the filter parameter (for error messaging) 950 param: the value of the paramter 951 """ 952 if isinstance(param, datetime.datetime): 953 return f"{int(param.timestamp())}" 954 elif isinstance(param, datetime.date): 955 return f"{param.isoformat()}" 956 957 raise ValueError( 958 f"Unsupported date/time type for {param_name}: got {param} of " 959 f"type {type(param)}. Supported types: datetime.datetime and" 960 f"datetime.date" 961 ) 962 963 964# pylint: enable=missing-raises-doc 965