1#!/usr/bin/env python 2# Copyright 2020 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Run xDS integration tests on GCP using Traffic Director.""" 16 17import argparse 18import datetime 19import json 20import logging 21import os 22import random 23import re 24import shlex 25import socket 26import subprocess 27import sys 28import tempfile 29import time 30import uuid 31 32from google.protobuf import json_format 33import googleapiclient.discovery 34import grpc 35from oauth2client.client import GoogleCredentials 36 37import python_utils.jobset as jobset 38import python_utils.report_utils as report_utils 39from src.proto.grpc.health.v1 import health_pb2 40from src.proto.grpc.health.v1 import health_pb2_grpc 41from src.proto.grpc.testing import empty_pb2 42from src.proto.grpc.testing import messages_pb2 43from src.proto.grpc.testing import test_pb2_grpc 44 45# Envoy protos provided by PyPI package xds-protos 46# Needs to import the generated Python file to load descriptors 47try: 48 from envoy.extensions.filters.common.fault.v3 import fault_pb2 49 from envoy.extensions.filters.http.fault.v3 import fault_pb2 50 from envoy.extensions.filters.http.router.v3 import router_pb2 51 from envoy.extensions.filters.network.http_connection_manager.v3 import \ 52 http_connection_manager_pb2 53 from envoy.service.status.v3 import csds_pb2 54 from envoy.service.status.v3 import csds_pb2_grpc 55except ImportError: 56 # These protos are required by CSDS test. We should not fail the entire 57 # script for one test case. 58 pass 59 60logger = logging.getLogger() 61console_handler = logging.StreamHandler() 62formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s') 63console_handler.setFormatter(formatter) 64logger.handlers = [] 65logger.addHandler(console_handler) 66logger.setLevel(logging.WARNING) 67 68# Suppress excessive logs for gRPC Python 69original_grpc_trace = os.environ.pop('GRPC_TRACE', None) 70original_grpc_verbosity = os.environ.pop('GRPC_VERBOSITY', None) 71# Suppress not-essential logs for GCP clients 72logging.getLogger('google_auth_httplib2').setLevel(logging.WARNING) 73logging.getLogger('googleapiclient.discovery').setLevel(logging.WARNING) 74 75_TEST_CASES = [ 76 'backends_restart', 77 'change_backend_service', 78 'gentle_failover', 79 'load_report_based_failover', 80 'ping_pong', 81 'remove_instance_group', 82 'round_robin', 83 'secondary_locality_gets_no_requests_on_partial_primary_failure', 84 'secondary_locality_gets_requests_on_primary_failure', 85 'traffic_splitting', 86 'path_matching', 87 'header_matching', 88 'api_listener', 89 'forwarding_rule_port_match', 90 'forwarding_rule_default_port', 91 'metadata_filter', 92] 93 94# Valid test cases, but not in all. So the tests can only run manually, and 95# aren't enabled automatically for all languages. 96# 97# TODO: Move them into _TEST_CASES when support is ready in all languages. 98_ADDITIONAL_TEST_CASES = [ 99 'circuit_breaking', 100 'timeout', 101 'fault_injection', 102 'csds', 103] 104 105# Test cases that require the V3 API. Skipped in older runs. 106_V3_TEST_CASES = frozenset(['timeout', 'fault_injection', 'csds']) 107 108# Test cases that require the alpha API. Skipped for stable API runs. 109_ALPHA_TEST_CASES = frozenset(['timeout']) 110 111 112def parse_test_cases(arg): 113 if arg == '': 114 return [] 115 arg_split = arg.split(',') 116 test_cases = set() 117 all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES 118 for arg in arg_split: 119 if arg == "all": 120 test_cases = test_cases.union(_TEST_CASES) 121 else: 122 test_cases = test_cases.union([arg]) 123 if not all([test_case in all_test_cases for test_case in test_cases]): 124 raise Exception('Failed to parse test cases %s' % arg) 125 # Perserve order. 126 return [x for x in all_test_cases if x in test_cases] 127 128 129def parse_port_range(port_arg): 130 try: 131 port = int(port_arg) 132 return range(port, port + 1) 133 except: 134 port_min, port_max = port_arg.split(':') 135 return range(int(port_min), int(port_max) + 1) 136 137 138argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP') 139# TODO(zdapeng): remove default value of project_id and project_num 140argp.add_argument('--project_id', default='grpc-testing', help='GCP project id') 141argp.add_argument('--project_num', 142 default='830293263384', 143 help='GCP project number') 144argp.add_argument( 145 '--gcp_suffix', 146 default='', 147 help='Optional suffix for all generated GCP resource names. Useful to ' 148 'ensure distinct names across test runs.') 149argp.add_argument( 150 '--test_case', 151 default='ping_pong', 152 type=parse_test_cases, 153 help='Comma-separated list of test cases to run. Available tests: %s, ' 154 '(or \'all\' to run every test). ' 155 'Alternative tests not included in \'all\': %s' % 156 (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES))) 157argp.add_argument( 158 '--bootstrap_file', 159 default='', 160 help='File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in ' 161 'bootstrap generation') 162argp.add_argument( 163 '--xds_v3_support', 164 default=False, 165 action='store_true', 166 help='Support xDS v3 via GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. ' 167 'If a pre-created bootstrap file is provided via the --bootstrap_file ' 168 'parameter, it should include xds_v3 in its server_features field.') 169argp.add_argument( 170 '--client_cmd', 171 default=None, 172 help='Command to launch xDS test client. {server_uri}, {stats_port} and ' 173 '{qps} references will be replaced using str.format(). GRPC_XDS_BOOTSTRAP ' 174 'will be set for the command') 175argp.add_argument( 176 '--client_hosts', 177 default=None, 178 help='Comma-separated list of hosts running client processes. If set, ' 179 '--client_cmd is ignored and client processes are assumed to be running on ' 180 'the specified hosts.') 181argp.add_argument('--zone', default='us-central1-a') 182argp.add_argument('--secondary_zone', 183 default='us-west1-b', 184 help='Zone to use for secondary TD locality tests') 185argp.add_argument('--qps', default=100, type=int, help='Client QPS') 186argp.add_argument( 187 '--wait_for_backend_sec', 188 default=1200, 189 type=int, 190 help='Time limit for waiting for created backend services to report ' 191 'healthy when launching or updated GCP resources') 192argp.add_argument( 193 '--use_existing_gcp_resources', 194 default=False, 195 action='store_true', 196 help= 197 'If set, find and use already created GCP resources instead of creating new' 198 ' ones.') 199argp.add_argument( 200 '--keep_gcp_resources', 201 default=False, 202 action='store_true', 203 help= 204 'Leave GCP VMs and configuration running after test. Default behavior is ' 205 'to delete when tests complete.') 206argp.add_argument('--halt_after_fail', 207 action='store_true', 208 help='Halt and save the resources when test failed.') 209argp.add_argument( 210 '--compute_discovery_document', 211 default=None, 212 type=str, 213 help= 214 'If provided, uses this file instead of retrieving via the GCP discovery ' 215 'API') 216argp.add_argument( 217 '--alpha_compute_discovery_document', 218 default=None, 219 type=str, 220 help='If provided, uses this file instead of retrieving via the alpha GCP ' 221 'discovery API') 222argp.add_argument('--network', 223 default='global/networks/default', 224 help='GCP network to use') 225_DEFAULT_PORT_RANGE = '8080:8280' 226argp.add_argument('--service_port_range', 227 default=_DEFAULT_PORT_RANGE, 228 type=parse_port_range, 229 help='Listening port for created gRPC backends. Specified as ' 230 'either a single int or as a range in the format min:max, in ' 231 'which case an available port p will be chosen s.t. min <= p ' 232 '<= max') 233argp.add_argument( 234 '--stats_port', 235 default=8079, 236 type=int, 237 help='Local port for the client process to expose the LB stats service') 238argp.add_argument('--xds_server', 239 default='trafficdirector.googleapis.com:443', 240 help='xDS server') 241argp.add_argument('--source_image', 242 default='projects/debian-cloud/global/images/family/debian-9', 243 help='Source image for VMs created during the test') 244argp.add_argument('--path_to_server_binary', 245 default=None, 246 type=str, 247 help='If set, the server binary must already be pre-built on ' 248 'the specified source image') 249argp.add_argument('--machine_type', 250 default='e2-standard-2', 251 help='Machine type for VMs created during the test') 252argp.add_argument( 253 '--instance_group_size', 254 default=2, 255 type=int, 256 help='Number of VMs to create per instance group. Certain test cases (e.g., ' 257 'round_robin) may not give meaningful results if this is set to a value ' 258 'less than 2.') 259argp.add_argument('--verbose', 260 help='verbose log output', 261 default=False, 262 action='store_true') 263# TODO(ericgribkoff) Remove this param once the sponge-formatted log files are 264# visible in all test environments. 265argp.add_argument('--log_client_output', 266 help='Log captured client output', 267 default=False, 268 action='store_true') 269# TODO(ericgribkoff) Remove this flag once all test environments are verified to 270# have access to the alpha compute APIs. 271argp.add_argument('--only_stable_gcp_apis', 272 help='Do not use alpha compute APIs. Some tests may be ' 273 'incompatible with this option (gRPC health checks are ' 274 'currently alpha and required for simulating server failure', 275 default=False, 276 action='store_true') 277args = argp.parse_args() 278 279if args.verbose: 280 logger.setLevel(logging.DEBUG) 281 282CLIENT_HOSTS = [] 283if args.client_hosts: 284 CLIENT_HOSTS = args.client_hosts.split(',') 285 286# Each of the config propagation in the control plane should finish within 600s. 287# Otherwise, it indicates a bug in the control plane. The config propagation 288# includes all kinds of traffic config update, like updating urlMap, creating 289# the resources for the first time, updating BackendService, and changing the 290# status of endpoints in BackendService. 291_WAIT_FOR_URL_MAP_PATCH_SEC = 600 292# In general, fetching load balancing stats only takes ~10s. However, slow 293# config update could lead to empty EDS or similar symptoms causing the 294# connection to hang for a long period of time. So, we want to extend the stats 295# wait time to be the same as urlMap patch time. 296_WAIT_FOR_STATS_SEC = _WAIT_FOR_URL_MAP_PATCH_SEC 297 298_DEFAULT_SERVICE_PORT = 80 299_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec 300_WAIT_FOR_OPERATION_SEC = 1200 301_INSTANCE_GROUP_SIZE = args.instance_group_size 302_NUM_TEST_RPCS = 10 * args.qps 303_CONNECTION_TIMEOUT_SEC = 60 304_GCP_API_RETRIES = 5 305_BOOTSTRAP_TEMPLATE = """ 306{{ 307 "node": {{ 308 "id": "{node_id}", 309 "metadata": {{ 310 "TRAFFICDIRECTOR_NETWORK_NAME": "%s", 311 "com.googleapis.trafficdirector.config_time_trace": "TRUE" 312 }}, 313 "locality": {{ 314 "zone": "%s" 315 }} 316 }}, 317 "xds_servers": [{{ 318 "server_uri": "%s", 319 "channel_creds": [ 320 {{ 321 "type": "google_default", 322 "config": {{}} 323 }} 324 ], 325 "server_features": {server_features} 326 }}] 327}}""" % (args.network.split('/')[-1], args.zone, args.xds_server) 328 329# TODO(ericgribkoff) Add change_backend_service to this list once TD no longer 330# sends an update with no localities when adding the MIG to the backend service 331# can race with the URL map patch. 332_TESTS_TO_FAIL_ON_RPC_FAILURE = ['ping_pong', 'round_robin'] 333# Tests that run UnaryCall and EmptyCall. 334_TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching'] 335# Tests that make UnaryCall with test metadata. 336_TESTS_TO_SEND_METADATA = ['header_matching'] 337_TEST_METADATA_KEY = 'xds_md' 338_TEST_METADATA_VALUE_UNARY = 'unary_yranu' 339_TEST_METADATA_VALUE_EMPTY = 'empty_ytpme' 340# Extra RPC metadata whose value is a number, sent with UnaryCall only. 341_TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric' 342_TEST_METADATA_NUMERIC_VALUE = '159' 343_PATH_MATCHER_NAME = 'path-matcher' 344_BASE_TEMPLATE_NAME = 'test-template' 345_BASE_INSTANCE_GROUP_NAME = 'test-ig' 346_BASE_HEALTH_CHECK_NAME = 'test-hc' 347_BASE_FIREWALL_RULE_NAME = 'test-fw-rule' 348_BASE_BACKEND_SERVICE_NAME = 'test-backend-service' 349_BASE_URL_MAP_NAME = 'test-map' 350_BASE_SERVICE_HOST = 'grpc-test' 351_BASE_TARGET_PROXY_NAME = 'test-target-proxy' 352_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule' 353_TEST_LOG_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 354 '../../reports') 355_SPONGE_LOG_NAME = 'sponge_log.log' 356_SPONGE_XML_NAME = 'sponge_log.xml' 357 358 359def get_client_stats(num_rpcs, timeout_sec): 360 if CLIENT_HOSTS: 361 hosts = CLIENT_HOSTS 362 else: 363 hosts = ['localhost'] 364 for host in hosts: 365 with grpc.insecure_channel('%s:%d' % 366 (host, args.stats_port)) as channel: 367 stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) 368 request = messages_pb2.LoadBalancerStatsRequest() 369 request.num_rpcs = num_rpcs 370 request.timeout_sec = timeout_sec 371 rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC 372 logger.debug('Invoking GetClientStats RPC to %s:%d:', host, 373 args.stats_port) 374 response = stub.GetClientStats(request, 375 wait_for_ready=True, 376 timeout=rpc_timeout) 377 logger.debug('Invoked GetClientStats RPC to %s: %s', host, 378 json_format.MessageToJson(response)) 379 return response 380 381 382def get_client_accumulated_stats(): 383 if CLIENT_HOSTS: 384 hosts = CLIENT_HOSTS 385 else: 386 hosts = ['localhost'] 387 for host in hosts: 388 with grpc.insecure_channel('%s:%d' % 389 (host, args.stats_port)) as channel: 390 stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) 391 request = messages_pb2.LoadBalancerAccumulatedStatsRequest() 392 logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:', 393 host, args.stats_port) 394 response = stub.GetClientAccumulatedStats( 395 request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC) 396 logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s', 397 host, response) 398 return response 399 400 401def get_client_xds_config_dump(): 402 if CLIENT_HOSTS: 403 hosts = CLIENT_HOSTS 404 else: 405 hosts = ['localhost'] 406 for host in hosts: 407 server_address = '%s:%d' % (host, args.stats_port) 408 with grpc.insecure_channel(server_address) as channel: 409 stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(channel) 410 logger.debug('Fetching xDS config dump from %s', server_address) 411 response = stub.FetchClientStatus(csds_pb2.ClientStatusRequest(), 412 wait_for_ready=True, 413 timeout=_CONNECTION_TIMEOUT_SEC) 414 logger.debug('Fetched xDS config dump from %s', server_address) 415 if len(response.config) != 1: 416 logger.error('Unexpected number of ClientConfigs %d: %s', 417 len(response.config), response) 418 return None 419 else: 420 # Converting the ClientStatusResponse into JSON, because many 421 # fields are packed in google.protobuf.Any. It will require many 422 # duplicated code to unpack proto message and inspect values. 423 return json_format.MessageToDict( 424 response.config[0], preserving_proto_field_name=True) 425 426 427def configure_client(rpc_types, metadata=[], timeout_sec=None): 428 if CLIENT_HOSTS: 429 hosts = CLIENT_HOSTS 430 else: 431 hosts = ['localhost'] 432 for host in hosts: 433 with grpc.insecure_channel('%s:%d' % 434 (host, args.stats_port)) as channel: 435 stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel) 436 request = messages_pb2.ClientConfigureRequest() 437 request.types.extend(rpc_types) 438 for rpc_type, md_key, md_value in metadata: 439 md = request.metadata.add() 440 md.type = rpc_type 441 md.key = md_key 442 md.value = md_value 443 if timeout_sec: 444 request.timeout_sec = timeout_sec 445 logger.debug( 446 'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s', 447 host, args.stats_port, request) 448 stub.Configure(request, 449 wait_for_ready=True, 450 timeout=_CONNECTION_TIMEOUT_SEC) 451 logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s', 452 host) 453 454 455class RpcDistributionError(Exception): 456 pass 457 458 459def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, 460 allow_failures): 461 start_time = time.time() 462 error_msg = None 463 logger.debug('Waiting for %d sec until backends %s receive load' % 464 (timeout_sec, backends)) 465 while time.time() - start_time <= timeout_sec: 466 error_msg = None 467 stats = get_client_stats(num_rpcs, timeout_sec) 468 rpcs_by_peer = stats.rpcs_by_peer 469 for backend in backends: 470 if backend not in rpcs_by_peer: 471 error_msg = 'Backend %s did not receive load' % backend 472 break 473 if not error_msg and len(rpcs_by_peer) > len(backends): 474 error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer 475 if not allow_failures and stats.num_failures > 0: 476 error_msg = '%d RPCs failed' % stats.num_failures 477 if not error_msg: 478 return 479 raise RpcDistributionError(error_msg) 480 481 482def wait_until_all_rpcs_go_to_given_backends_or_fail(backends, 483 timeout_sec, 484 num_rpcs=_NUM_TEST_RPCS): 485 _verify_rpcs_to_given_backends(backends, 486 timeout_sec, 487 num_rpcs, 488 allow_failures=True) 489 490 491def wait_until_all_rpcs_go_to_given_backends(backends, 492 timeout_sec, 493 num_rpcs=_NUM_TEST_RPCS): 494 _verify_rpcs_to_given_backends(backends, 495 timeout_sec, 496 num_rpcs, 497 allow_failures=False) 498 499 500def wait_until_no_rpcs_go_to_given_backends(backends, timeout_sec): 501 start_time = time.time() 502 while time.time() - start_time <= timeout_sec: 503 stats = get_client_stats(_NUM_TEST_RPCS, timeout_sec) 504 error_msg = None 505 rpcs_by_peer = stats.rpcs_by_peer 506 for backend in backends: 507 if backend in rpcs_by_peer: 508 error_msg = 'Unexpected backend %s receives load' % backend 509 break 510 if not error_msg: 511 return 512 raise Exception('Unexpected RPCs going to given backends') 513 514 515def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): 516 '''Block until the test client reaches the state with the given number 517 of RPCs being outstanding stably. 518 519 Args: 520 rpc_type: A string indicating the RPC method to check for. Either 521 'UnaryCall' or 'EmptyCall'. 522 timeout_sec: Maximum number of seconds to wait until the desired state 523 is reached. 524 num_rpcs: Expected number of RPCs to be in-flight. 525 threshold: Number within [0,100], the tolerable percentage by which 526 the actual number of RPCs in-flight can differ from the expected number. 527 ''' 528 if threshold < 0 or threshold > 100: 529 raise ValueError('Value error: Threshold should be between 0 to 100') 530 threshold_fraction = threshold / 100.0 531 start_time = time.time() 532 error_msg = None 533 logger.debug( 534 'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' % 535 (timeout_sec, num_rpcs, rpc_type, threshold)) 536 while time.time() - start_time <= timeout_sec: 537 error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, 538 threshold_fraction) 539 if error_msg: 540 logger.debug('Progress: %s', error_msg) 541 time.sleep(2) 542 else: 543 break 544 # Ensure the number of outstanding RPCs is stable. 545 if not error_msg: 546 time.sleep(5) 547 error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, 548 threshold_fraction) 549 if error_msg: 550 raise Exception("Wrong number of %s RPCs in-flight: %s" % 551 (rpc_type, error_msg)) 552 553 554def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction): 555 error_msg = None 556 stats = get_client_accumulated_stats() 557 rpcs_started = stats.num_rpcs_started_by_method[rpc_type] 558 rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type] 559 rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type] 560 rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed 561 if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)): 562 error_msg = ('actual(%d) < expected(%d - %d%%)' % 563 (rpcs_in_flight, num_rpcs, threshold)) 564 elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)): 565 error_msg = ('actual(%d) > expected(%d + %d%%)' % 566 (rpcs_in_flight, num_rpcs, threshold)) 567 return error_msg 568 569 570def compare_distributions(actual_distribution, expected_distribution, 571 threshold): 572 """Compare if two distributions are similar. 573 574 Args: 575 actual_distribution: A list of floats, contains the actual distribution. 576 expected_distribution: A list of floats, contains the expected distribution. 577 threshold: Number within [0,100], the threshold percentage by which the 578 actual distribution can differ from the expected distribution. 579 580 Returns: 581 The similarity between the distributions as a boolean. Returns true if the 582 actual distribution lies within the threshold of the expected 583 distribution, false otherwise. 584 585 Raises: 586 ValueError: if threshold is not with in [0,100]. 587 Exception: containing detailed error messages. 588 """ 589 if len(expected_distribution) != len(actual_distribution): 590 raise Exception( 591 'Error: expected and actual distributions have different size (%d vs %d)' 592 % (len(expected_distribution), len(actual_distribution))) 593 if threshold < 0 or threshold > 100: 594 raise ValueError('Value error: Threshold should be between 0 to 100') 595 threshold_fraction = threshold / 100.0 596 for expected, actual in zip(expected_distribution, actual_distribution): 597 if actual < (expected * (1 - threshold_fraction)): 598 raise Exception("actual(%f) < expected(%f-%d%%)" % 599 (actual, expected, threshold)) 600 if actual > (expected * (1 + threshold_fraction)): 601 raise Exception("actual(%f) > expected(%f+%d%%)" % 602 (actual, expected, threshold)) 603 return True 604 605 606def compare_expected_instances(stats, expected_instances): 607 """Compare if stats have expected instances for each type of RPC. 608 609 Args: 610 stats: LoadBalancerStatsResponse reported by interop client. 611 expected_instances: a dict with key as the RPC type (string), value as 612 the expected backend instances (list of strings). 613 614 Returns: 615 Returns true if the instances are expected. False if not. 616 """ 617 for rpc_type, expected_peers in expected_instances.items(): 618 rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type] 619 rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None 620 logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer) 621 peers = list(rpcs_by_peer.keys()) 622 if set(peers) != set(expected_peers): 623 logger.info('unexpected peers for %s, got %s, want %s', rpc_type, 624 peers, expected_peers) 625 return False 626 return True 627 628 629def test_backends_restart(gcp, backend_service, instance_group): 630 logger.info('Running test_backends_restart') 631 instance_names = get_instance_names(gcp, instance_group) 632 num_instances = len(instance_names) 633 start_time = time.time() 634 wait_until_all_rpcs_go_to_given_backends(instance_names, 635 _WAIT_FOR_STATS_SEC) 636 try: 637 resize_instance_group(gcp, instance_group, 0) 638 wait_until_all_rpcs_go_to_given_backends_or_fail([], 639 _WAIT_FOR_BACKEND_SEC) 640 finally: 641 resize_instance_group(gcp, instance_group, num_instances) 642 wait_for_healthy_backends(gcp, backend_service, instance_group) 643 new_instance_names = get_instance_names(gcp, instance_group) 644 wait_until_all_rpcs_go_to_given_backends(new_instance_names, 645 _WAIT_FOR_BACKEND_SEC) 646 647 648def test_change_backend_service(gcp, original_backend_service, instance_group, 649 alternate_backend_service, 650 same_zone_instance_group): 651 logger.info('Running test_change_backend_service') 652 original_backend_instances = get_instance_names(gcp, instance_group) 653 alternate_backend_instances = get_instance_names(gcp, 654 same_zone_instance_group) 655 patch_backend_service(gcp, alternate_backend_service, 656 [same_zone_instance_group]) 657 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 658 wait_for_healthy_backends(gcp, alternate_backend_service, 659 same_zone_instance_group) 660 wait_until_all_rpcs_go_to_given_backends(original_backend_instances, 661 _WAIT_FOR_STATS_SEC) 662 passed = True 663 try: 664 patch_url_map_backend_service(gcp, alternate_backend_service) 665 wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances, 666 _WAIT_FOR_URL_MAP_PATCH_SEC) 667 except Exception: 668 passed = False 669 raise 670 finally: 671 if passed or not args.halt_after_fail: 672 patch_url_map_backend_service(gcp, original_backend_service) 673 patch_backend_service(gcp, alternate_backend_service, []) 674 675 676def test_gentle_failover(gcp, 677 backend_service, 678 primary_instance_group, 679 secondary_instance_group, 680 swapped_primary_and_secondary=False): 681 logger.info('Running test_gentle_failover') 682 num_primary_instances = len(get_instance_names(gcp, primary_instance_group)) 683 min_instances_for_gentle_failover = 3 # Need >50% failure to start failover 684 passed = True 685 try: 686 if num_primary_instances < min_instances_for_gentle_failover: 687 resize_instance_group(gcp, primary_instance_group, 688 min_instances_for_gentle_failover) 689 patch_backend_service( 690 gcp, backend_service, 691 [primary_instance_group, secondary_instance_group]) 692 primary_instance_names = get_instance_names(gcp, primary_instance_group) 693 secondary_instance_names = get_instance_names(gcp, 694 secondary_instance_group) 695 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 696 wait_for_healthy_backends(gcp, backend_service, 697 secondary_instance_group) 698 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 699 _WAIT_FOR_STATS_SEC) 700 instances_to_stop = primary_instance_names[:-1] 701 remaining_instances = primary_instance_names[-1:] 702 try: 703 set_serving_status(instances_to_stop, 704 gcp.service_port, 705 serving=False) 706 wait_until_all_rpcs_go_to_given_backends( 707 remaining_instances + secondary_instance_names, 708 _WAIT_FOR_BACKEND_SEC) 709 finally: 710 set_serving_status(primary_instance_names, 711 gcp.service_port, 712 serving=True) 713 except RpcDistributionError as e: 714 if not swapped_primary_and_secondary and is_primary_instance_group( 715 gcp, secondary_instance_group): 716 # Swap expectation of primary and secondary instance groups. 717 test_gentle_failover(gcp, 718 backend_service, 719 secondary_instance_group, 720 primary_instance_group, 721 swapped_primary_and_secondary=True) 722 else: 723 passed = False 724 raise e 725 except Exception: 726 passed = False 727 raise 728 finally: 729 if passed or not args.halt_after_fail: 730 patch_backend_service(gcp, backend_service, 731 [primary_instance_group]) 732 resize_instance_group(gcp, primary_instance_group, 733 num_primary_instances) 734 instance_names = get_instance_names(gcp, primary_instance_group) 735 wait_until_all_rpcs_go_to_given_backends(instance_names, 736 _WAIT_FOR_BACKEND_SEC) 737 738 739def test_load_report_based_failover(gcp, backend_service, 740 primary_instance_group, 741 secondary_instance_group): 742 logger.info('Running test_load_report_based_failover') 743 passed = True 744 try: 745 patch_backend_service( 746 gcp, backend_service, 747 [primary_instance_group, secondary_instance_group]) 748 primary_instance_names = get_instance_names(gcp, primary_instance_group) 749 secondary_instance_names = get_instance_names(gcp, 750 secondary_instance_group) 751 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 752 wait_for_healthy_backends(gcp, backend_service, 753 secondary_instance_group) 754 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 755 _WAIT_FOR_STATS_SEC) 756 # Set primary locality's balance mode to RATE, and RPS to 20% of the 757 # client's QPS. The secondary locality will be used. 758 max_rate = int(args.qps * 1 / 5) 759 logger.info('Patching backend service to RATE with %d max_rate', 760 max_rate) 761 patch_backend_service( 762 gcp, 763 backend_service, [primary_instance_group, secondary_instance_group], 764 balancing_mode='RATE', 765 max_rate=max_rate) 766 wait_until_all_rpcs_go_to_given_backends( 767 primary_instance_names + secondary_instance_names, 768 _WAIT_FOR_BACKEND_SEC) 769 770 # Set primary locality's balance mode to RATE, and RPS to 120% of the 771 # client's QPS. Only the primary locality will be used. 772 max_rate = int(args.qps * 6 / 5) 773 logger.info('Patching backend service to RATE with %d max_rate', 774 max_rate) 775 patch_backend_service( 776 gcp, 777 backend_service, [primary_instance_group, secondary_instance_group], 778 balancing_mode='RATE', 779 max_rate=max_rate) 780 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 781 _WAIT_FOR_BACKEND_SEC) 782 logger.info("success") 783 except Exception: 784 passed = False 785 raise 786 finally: 787 if passed or not args.halt_after_fail: 788 patch_backend_service(gcp, backend_service, 789 [primary_instance_group]) 790 instance_names = get_instance_names(gcp, primary_instance_group) 791 wait_until_all_rpcs_go_to_given_backends(instance_names, 792 _WAIT_FOR_BACKEND_SEC) 793 794 795def test_ping_pong(gcp, backend_service, instance_group): 796 logger.info('Running test_ping_pong') 797 wait_for_healthy_backends(gcp, backend_service, instance_group) 798 instance_names = get_instance_names(gcp, instance_group) 799 wait_until_all_rpcs_go_to_given_backends(instance_names, 800 _WAIT_FOR_STATS_SEC) 801 802 803def test_remove_instance_group(gcp, backend_service, instance_group, 804 same_zone_instance_group): 805 logger.info('Running test_remove_instance_group') 806 passed = True 807 try: 808 patch_backend_service(gcp, 809 backend_service, 810 [instance_group, same_zone_instance_group], 811 balancing_mode='RATE') 812 wait_for_healthy_backends(gcp, backend_service, instance_group) 813 wait_for_healthy_backends(gcp, backend_service, 814 same_zone_instance_group) 815 instance_names = get_instance_names(gcp, instance_group) 816 same_zone_instance_names = get_instance_names(gcp, 817 same_zone_instance_group) 818 try: 819 wait_until_all_rpcs_go_to_given_backends( 820 instance_names + same_zone_instance_names, 821 _WAIT_FOR_OPERATION_SEC) 822 remaining_instance_group = same_zone_instance_group 823 remaining_instance_names = same_zone_instance_names 824 except RpcDistributionError as e: 825 # If connected to TD in a different zone, we may route traffic to 826 # only one instance group. Determine which group that is to continue 827 # with the remainder of the test case. 828 try: 829 wait_until_all_rpcs_go_to_given_backends( 830 instance_names, _WAIT_FOR_STATS_SEC) 831 remaining_instance_group = same_zone_instance_group 832 remaining_instance_names = same_zone_instance_names 833 except RpcDistributionError as e: 834 wait_until_all_rpcs_go_to_given_backends( 835 same_zone_instance_names, _WAIT_FOR_STATS_SEC) 836 remaining_instance_group = instance_group 837 remaining_instance_names = instance_names 838 patch_backend_service(gcp, 839 backend_service, [remaining_instance_group], 840 balancing_mode='RATE') 841 wait_until_all_rpcs_go_to_given_backends(remaining_instance_names, 842 _WAIT_FOR_BACKEND_SEC) 843 except Exception: 844 passed = False 845 raise 846 finally: 847 if passed or not args.halt_after_fail: 848 patch_backend_service(gcp, backend_service, [instance_group]) 849 wait_until_all_rpcs_go_to_given_backends(instance_names, 850 _WAIT_FOR_BACKEND_SEC) 851 852 853def test_round_robin(gcp, backend_service, instance_group): 854 logger.info('Running test_round_robin') 855 wait_for_healthy_backends(gcp, backend_service, instance_group) 856 instance_names = get_instance_names(gcp, instance_group) 857 threshold = 1 858 wait_until_all_rpcs_go_to_given_backends(instance_names, 859 _WAIT_FOR_STATS_SEC) 860 # TODO(ericgribkoff) Delayed config propagation from earlier tests 861 # may result in briefly receiving an empty EDS update, resulting in failed 862 # RPCs. Retry distribution validation if this occurs; long-term fix is 863 # creating new backend resources for each individual test case. 864 # Each attempt takes 10 seconds. Config propagation can take several 865 # minutes. 866 max_attempts = 40 867 for i in range(max_attempts): 868 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 869 requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer] 870 total_requests_received = sum(requests_received) 871 if total_requests_received != _NUM_TEST_RPCS: 872 logger.info('Unexpected RPC failures, retrying: %s', stats) 873 continue 874 expected_requests = total_requests_received / len(instance_names) 875 for instance in instance_names: 876 if abs(stats.rpcs_by_peer[instance] - 877 expected_requests) > threshold: 878 raise Exception( 879 'RPC peer distribution differs from expected by more than %d ' 880 'for instance %s (%s)' % (threshold, instance, stats)) 881 return 882 raise Exception('RPC failures persisted through %d retries' % max_attempts) 883 884 885def test_secondary_locality_gets_no_requests_on_partial_primary_failure( 886 gcp, 887 backend_service, 888 primary_instance_group, 889 secondary_instance_group, 890 swapped_primary_and_secondary=False): 891 logger.info( 892 'Running secondary_locality_gets_no_requests_on_partial_primary_failure' 893 ) 894 passed = True 895 try: 896 patch_backend_service( 897 gcp, backend_service, 898 [primary_instance_group, secondary_instance_group]) 899 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 900 wait_for_healthy_backends(gcp, backend_service, 901 secondary_instance_group) 902 primary_instance_names = get_instance_names(gcp, primary_instance_group) 903 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 904 _WAIT_FOR_STATS_SEC) 905 instances_to_stop = primary_instance_names[:1] 906 remaining_instances = primary_instance_names[1:] 907 try: 908 set_serving_status(instances_to_stop, 909 gcp.service_port, 910 serving=False) 911 wait_until_all_rpcs_go_to_given_backends(remaining_instances, 912 _WAIT_FOR_BACKEND_SEC) 913 finally: 914 set_serving_status(primary_instance_names, 915 gcp.service_port, 916 serving=True) 917 except RpcDistributionError as e: 918 if not swapped_primary_and_secondary and is_primary_instance_group( 919 gcp, secondary_instance_group): 920 # Swap expectation of primary and secondary instance groups. 921 test_secondary_locality_gets_no_requests_on_partial_primary_failure( 922 gcp, 923 backend_service, 924 secondary_instance_group, 925 primary_instance_group, 926 swapped_primary_and_secondary=True) 927 else: 928 passed = False 929 raise e 930 finally: 931 if passed or not args.halt_after_fail: 932 patch_backend_service(gcp, backend_service, 933 [primary_instance_group]) 934 935 936def test_secondary_locality_gets_requests_on_primary_failure( 937 gcp, 938 backend_service, 939 primary_instance_group, 940 secondary_instance_group, 941 swapped_primary_and_secondary=False): 942 logger.info('Running secondary_locality_gets_requests_on_primary_failure') 943 passed = True 944 try: 945 patch_backend_service( 946 gcp, backend_service, 947 [primary_instance_group, secondary_instance_group]) 948 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 949 wait_for_healthy_backends(gcp, backend_service, 950 secondary_instance_group) 951 primary_instance_names = get_instance_names(gcp, primary_instance_group) 952 secondary_instance_names = get_instance_names(gcp, 953 secondary_instance_group) 954 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 955 _WAIT_FOR_STATS_SEC) 956 try: 957 set_serving_status(primary_instance_names, 958 gcp.service_port, 959 serving=False) 960 wait_until_all_rpcs_go_to_given_backends(secondary_instance_names, 961 _WAIT_FOR_BACKEND_SEC) 962 finally: 963 set_serving_status(primary_instance_names, 964 gcp.service_port, 965 serving=True) 966 except RpcDistributionError as e: 967 if not swapped_primary_and_secondary and is_primary_instance_group( 968 gcp, secondary_instance_group): 969 # Swap expectation of primary and secondary instance groups. 970 test_secondary_locality_gets_requests_on_primary_failure( 971 gcp, 972 backend_service, 973 secondary_instance_group, 974 primary_instance_group, 975 swapped_primary_and_secondary=True) 976 else: 977 passed = False 978 raise e 979 finally: 980 if passed or not args.halt_after_fail: 981 patch_backend_service(gcp, backend_service, 982 [primary_instance_group]) 983 984 985def prepare_services_for_urlmap_tests(gcp, original_backend_service, 986 instance_group, alternate_backend_service, 987 same_zone_instance_group): 988 ''' 989 This function prepares the services to be ready for tests that modifies 990 urlmaps. 991 992 Returns: 993 Returns original and alternate backend names as lists of strings. 994 ''' 995 logger.info('waiting for original backends to become healthy') 996 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 997 998 patch_backend_service(gcp, alternate_backend_service, 999 [same_zone_instance_group]) 1000 logger.info('waiting for alternate to become healthy') 1001 wait_for_healthy_backends(gcp, alternate_backend_service, 1002 same_zone_instance_group) 1003 1004 original_backend_instances = get_instance_names(gcp, instance_group) 1005 logger.info('original backends instances: %s', original_backend_instances) 1006 1007 alternate_backend_instances = get_instance_names(gcp, 1008 same_zone_instance_group) 1009 logger.info('alternate backends instances: %s', alternate_backend_instances) 1010 1011 # Start with all traffic going to original_backend_service. 1012 logger.info('waiting for traffic to all go to original backends') 1013 wait_until_all_rpcs_go_to_given_backends(original_backend_instances, 1014 _WAIT_FOR_STATS_SEC) 1015 return original_backend_instances, alternate_backend_instances 1016 1017 1018def test_metadata_filter(gcp, original_backend_service, instance_group, 1019 alternate_backend_service, same_zone_instance_group): 1020 logger.info("Running test_metadata_filter") 1021 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 1022 original_backend_instances = get_instance_names(gcp, instance_group) 1023 alternate_backend_instances = get_instance_names(gcp, 1024 same_zone_instance_group) 1025 patch_backend_service(gcp, alternate_backend_service, 1026 [same_zone_instance_group]) 1027 wait_for_healthy_backends(gcp, alternate_backend_service, 1028 same_zone_instance_group) 1029 passed = True 1030 try: 1031 with open(bootstrap_path) as f: 1032 md = json.load(f)['node']['metadata'] 1033 match_labels = [] 1034 for k, v in md.items(): 1035 match_labels.append({'name': k, 'value': v}) 1036 1037 not_match_labels = [{'name': 'fake', 'value': 'fail'}] 1038 test_route_rules = [ 1039 # test MATCH_ALL 1040 [ 1041 { 1042 'priority': 0, 1043 'matchRules': [{ 1044 'prefixMatch': 1045 '/', 1046 'metadataFilters': [{ 1047 'filterMatchCriteria': 'MATCH_ALL', 1048 'filterLabels': not_match_labels 1049 }] 1050 }], 1051 'service': original_backend_service.url 1052 }, 1053 { 1054 'priority': 1, 1055 'matchRules': [{ 1056 'prefixMatch': 1057 '/', 1058 'metadataFilters': [{ 1059 'filterMatchCriteria': 'MATCH_ALL', 1060 'filterLabels': match_labels 1061 }] 1062 }], 1063 'service': alternate_backend_service.url 1064 }, 1065 ], 1066 # test mixing MATCH_ALL and MATCH_ANY 1067 # test MATCH_ALL: super set labels won't match 1068 [ 1069 { 1070 'priority': 0, 1071 'matchRules': [{ 1072 'prefixMatch': 1073 '/', 1074 'metadataFilters': [{ 1075 'filterMatchCriteria': 'MATCH_ALL', 1076 'filterLabels': not_match_labels + match_labels 1077 }] 1078 }], 1079 'service': original_backend_service.url 1080 }, 1081 { 1082 'priority': 1, 1083 'matchRules': [{ 1084 'prefixMatch': 1085 '/', 1086 'metadataFilters': [{ 1087 'filterMatchCriteria': 'MATCH_ANY', 1088 'filterLabels': not_match_labels + match_labels 1089 }] 1090 }], 1091 'service': alternate_backend_service.url 1092 }, 1093 ], 1094 # test MATCH_ANY 1095 [ 1096 { 1097 'priority': 0, 1098 'matchRules': [{ 1099 'prefixMatch': 1100 '/', 1101 'metadataFilters': [{ 1102 'filterMatchCriteria': 'MATCH_ANY', 1103 'filterLabels': not_match_labels 1104 }] 1105 }], 1106 'service': original_backend_service.url 1107 }, 1108 { 1109 'priority': 1, 1110 'matchRules': [{ 1111 'prefixMatch': 1112 '/', 1113 'metadataFilters': [{ 1114 'filterMatchCriteria': 'MATCH_ANY', 1115 'filterLabels': not_match_labels + match_labels 1116 }] 1117 }], 1118 'service': alternate_backend_service.url 1119 }, 1120 ], 1121 # test match multiple route rules 1122 [ 1123 { 1124 'priority': 0, 1125 'matchRules': [{ 1126 'prefixMatch': 1127 '/', 1128 'metadataFilters': [{ 1129 'filterMatchCriteria': 'MATCH_ANY', 1130 'filterLabels': match_labels 1131 }] 1132 }], 1133 'service': alternate_backend_service.url 1134 }, 1135 { 1136 'priority': 1, 1137 'matchRules': [{ 1138 'prefixMatch': 1139 '/', 1140 'metadataFilters': [{ 1141 'filterMatchCriteria': 'MATCH_ALL', 1142 'filterLabels': match_labels 1143 }] 1144 }], 1145 'service': original_backend_service.url 1146 }, 1147 ] 1148 ] 1149 1150 for route_rules in test_route_rules: 1151 wait_until_all_rpcs_go_to_given_backends(original_backend_instances, 1152 _WAIT_FOR_STATS_SEC) 1153 patch_url_map_backend_service(gcp, 1154 original_backend_service, 1155 route_rules=route_rules) 1156 wait_until_no_rpcs_go_to_given_backends(original_backend_instances, 1157 _WAIT_FOR_STATS_SEC) 1158 wait_until_all_rpcs_go_to_given_backends( 1159 alternate_backend_instances, _WAIT_FOR_STATS_SEC) 1160 patch_url_map_backend_service(gcp, original_backend_service) 1161 except Exception: 1162 passed = False 1163 raise 1164 finally: 1165 if passed or not args.halt_after_fail: 1166 patch_backend_service(gcp, alternate_backend_service, []) 1167 1168 1169def test_api_listener(gcp, backend_service, instance_group, 1170 alternate_backend_service): 1171 logger.info("Running api_listener") 1172 passed = True 1173 try: 1174 wait_for_healthy_backends(gcp, backend_service, instance_group) 1175 backend_instances = get_instance_names(gcp, instance_group) 1176 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1177 _WAIT_FOR_STATS_SEC) 1178 # create a second suite of map+tp+fr with the same host name in host rule 1179 # and we have to disable proxyless validation because it needs `0.0.0.0` 1180 # ip address in fr for proxyless and also we violate ip:port uniqueness 1181 # for test purpose. See https://github.com/grpc/grpc-java/issues/8009 1182 new_config_suffix = '2' 1183 url_map_2 = create_url_map(gcp, url_map_name + new_config_suffix, 1184 backend_service, service_host_name) 1185 target_proxy_2 = create_target_proxy( 1186 gcp, target_proxy_name + new_config_suffix, False, url_map_2) 1187 if not gcp.service_port: 1188 raise Exception( 1189 'Faied to find a valid port for the forwarding rule') 1190 potential_ip_addresses = [] 1191 max_attempts = 10 1192 for i in range(max_attempts): 1193 potential_ip_addresses.append('10.10.10.%d' % 1194 (random.randint(0, 255))) 1195 create_global_forwarding_rule(gcp, 1196 forwarding_rule_name + new_config_suffix, 1197 [gcp.service_port], 1198 potential_ip_addresses, target_proxy_2) 1199 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1200 patch_url_map_host_rule_with_port(gcp, 1201 url_map_name + new_config_suffix, 1202 backend_service, 1203 service_host_name) 1204 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1205 _WAIT_FOR_STATS_SEC) 1206 1207 delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0]) 1208 delete_target_proxy(gcp, gcp.target_proxies[0]) 1209 delete_url_map(gcp, gcp.url_maps[0]) 1210 verify_attempts = int(_WAIT_FOR_URL_MAP_PATCH_SEC / _NUM_TEST_RPCS * 1211 args.qps) 1212 for i in range(verify_attempts): 1213 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1214 _WAIT_FOR_STATS_SEC) 1215 # delete host rule for the original host name 1216 patch_url_map_backend_service(gcp, alternate_backend_service) 1217 wait_until_no_rpcs_go_to_given_backends(backend_instances, 1218 _WAIT_FOR_STATS_SEC) 1219 1220 except Exception: 1221 passed = False 1222 raise 1223 finally: 1224 if passed or not args.halt_after_fail: 1225 delete_global_forwarding_rules(gcp) 1226 delete_target_proxies(gcp) 1227 delete_url_maps(gcp) 1228 create_url_map(gcp, url_map_name, backend_service, 1229 service_host_name) 1230 create_target_proxy(gcp, target_proxy_name) 1231 create_global_forwarding_rule(gcp, forwarding_rule_name, 1232 potential_service_ports) 1233 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1234 patch_url_map_host_rule_with_port(gcp, url_map_name, 1235 backend_service, 1236 service_host_name) 1237 server_uri = service_host_name + ':' + str(gcp.service_port) 1238 else: 1239 server_uri = service_host_name 1240 return server_uri 1241 1242 1243def test_forwarding_rule_port_match(gcp, backend_service, instance_group): 1244 logger.info("Running test_forwarding_rule_port_match") 1245 passed = True 1246 try: 1247 wait_for_healthy_backends(gcp, backend_service, instance_group) 1248 backend_instances = get_instance_names(gcp, instance_group) 1249 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1250 _WAIT_FOR_STATS_SEC) 1251 delete_global_forwarding_rules(gcp) 1252 create_global_forwarding_rule(gcp, forwarding_rule_name, [ 1253 x for x in parse_port_range(_DEFAULT_PORT_RANGE) 1254 if x != gcp.service_port 1255 ]) 1256 wait_until_no_rpcs_go_to_given_backends(backend_instances, 1257 _WAIT_FOR_STATS_SEC) 1258 except Exception: 1259 passed = False 1260 raise 1261 finally: 1262 if passed or not args.halt_after_fail: 1263 delete_global_forwarding_rules(gcp) 1264 create_global_forwarding_rule(gcp, forwarding_rule_name, 1265 potential_service_ports) 1266 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1267 patch_url_map_host_rule_with_port(gcp, url_map_name, 1268 backend_service, 1269 service_host_name) 1270 server_uri = service_host_name + ':' + str(gcp.service_port) 1271 else: 1272 server_uri = service_host_name 1273 return server_uri 1274 1275 1276def test_forwarding_rule_default_port(gcp, backend_service, instance_group): 1277 logger.info("Running test_forwarding_rule_default_port") 1278 passed = True 1279 try: 1280 wait_for_healthy_backends(gcp, backend_service, instance_group) 1281 backend_instances = get_instance_names(gcp, instance_group) 1282 if gcp.service_port == _DEFAULT_SERVICE_PORT: 1283 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1284 _WAIT_FOR_STATS_SEC) 1285 delete_global_forwarding_rules(gcp) 1286 create_global_forwarding_rule(gcp, forwarding_rule_name, 1287 parse_port_range(_DEFAULT_PORT_RANGE)) 1288 patch_url_map_host_rule_with_port(gcp, url_map_name, 1289 backend_service, 1290 service_host_name) 1291 wait_until_no_rpcs_go_to_given_backends(backend_instances, 1292 _WAIT_FOR_STATS_SEC) 1293 # expect success when no port in client request service uri, and no port in url-map 1294 delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0]) 1295 delete_target_proxy(gcp, gcp.target_proxies[0]) 1296 delete_url_map(gcp, gcp.url_maps[0]) 1297 create_url_map(gcp, url_map_name, backend_service, service_host_name) 1298 create_target_proxy(gcp, target_proxy_name, False) 1299 potential_ip_addresses = [] 1300 max_attempts = 10 1301 for i in range(max_attempts): 1302 potential_ip_addresses.append('10.10.10.%d' % 1303 (random.randint(0, 255))) 1304 create_global_forwarding_rule(gcp, forwarding_rule_name, [80], 1305 potential_ip_addresses) 1306 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1307 _WAIT_FOR_STATS_SEC) 1308 1309 # expect failure when no port in client request uri, but specify port in url-map 1310 patch_url_map_host_rule_with_port(gcp, url_map_name, backend_service, 1311 service_host_name) 1312 wait_until_no_rpcs_go_to_given_backends(backend_instances, 1313 _WAIT_FOR_STATS_SEC) 1314 except Exception: 1315 passed = False 1316 raise 1317 finally: 1318 if passed or not args.halt_after_fail: 1319 delete_global_forwarding_rules(gcp) 1320 delete_target_proxies(gcp) 1321 delete_url_maps(gcp) 1322 create_url_map(gcp, url_map_name, backend_service, 1323 service_host_name) 1324 create_target_proxy(gcp, target_proxy_name) 1325 create_global_forwarding_rule(gcp, forwarding_rule_name, 1326 potential_service_ports) 1327 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1328 patch_url_map_host_rule_with_port(gcp, url_map_name, 1329 backend_service, 1330 service_host_name) 1331 server_uri = service_host_name + ':' + str(gcp.service_port) 1332 else: 1333 server_uri = service_host_name 1334 return server_uri 1335 1336 1337def test_traffic_splitting(gcp, original_backend_service, instance_group, 1338 alternate_backend_service, same_zone_instance_group): 1339 # This test start with all traffic going to original_backend_service. Then 1340 # it updates URL-map to set default action to traffic splitting between 1341 # original and alternate. It waits for all backends in both services to 1342 # receive traffic, then verifies that weights are expected. 1343 logger.info('Running test_traffic_splitting') 1344 1345 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 1346 gcp, original_backend_service, instance_group, 1347 alternate_backend_service, same_zone_instance_group) 1348 1349 passed = True 1350 try: 1351 # Patch urlmap, change route action to traffic splitting between 1352 # original and alternate. 1353 logger.info('patching url map with traffic splitting') 1354 original_service_percentage, alternate_service_percentage = 20, 80 1355 patch_url_map_backend_service( 1356 gcp, 1357 services_with_weights={ 1358 original_backend_service: original_service_percentage, 1359 alternate_backend_service: alternate_service_percentage, 1360 }) 1361 # Split percentage between instances: [20,80] -> [10,10,40,40]. 1362 expected_instance_percentage = [ 1363 original_service_percentage * 1.0 / len(original_backend_instances) 1364 ] * len(original_backend_instances) + [ 1365 alternate_service_percentage * 1.0 / 1366 len(alternate_backend_instances) 1367 ] * len(alternate_backend_instances) 1368 1369 # Wait for traffic to go to both services. 1370 logger.info( 1371 'waiting for traffic to go to all backends (including alternate)') 1372 wait_until_all_rpcs_go_to_given_backends( 1373 original_backend_instances + alternate_backend_instances, 1374 _WAIT_FOR_STATS_SEC) 1375 1376 # Verify that weights between two services are expected. 1377 retry_count = 10 1378 # Each attempt takes about 10 seconds, 10 retries is equivalent to 100 1379 # seconds timeout. 1380 for i in range(retry_count): 1381 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1382 got_instance_count = [ 1383 stats.rpcs_by_peer[i] for i in original_backend_instances 1384 ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances] 1385 total_count = sum(got_instance_count) 1386 got_instance_percentage = [ 1387 x * 100.0 / total_count for x in got_instance_count 1388 ] 1389 1390 try: 1391 compare_distributions(got_instance_percentage, 1392 expected_instance_percentage, 5) 1393 except Exception as e: 1394 logger.info('attempt %d', i) 1395 logger.info('got percentage: %s', got_instance_percentage) 1396 logger.info('expected percentage: %s', 1397 expected_instance_percentage) 1398 logger.info(e) 1399 if i == retry_count - 1: 1400 raise Exception( 1401 'RPC distribution (%s) differs from expected (%s)' % 1402 (got_instance_percentage, expected_instance_percentage)) 1403 else: 1404 logger.info("success") 1405 break 1406 except Exception: 1407 passed = False 1408 raise 1409 finally: 1410 if passed or not args.halt_after_fail: 1411 patch_url_map_backend_service(gcp, original_backend_service) 1412 patch_backend_service(gcp, alternate_backend_service, []) 1413 1414 1415def test_path_matching(gcp, original_backend_service, instance_group, 1416 alternate_backend_service, same_zone_instance_group): 1417 # This test start with all traffic (UnaryCall and EmptyCall) going to 1418 # original_backend_service. 1419 # 1420 # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to 1421 # go different backends. It waits for all backends in both services to 1422 # receive traffic, then verifies that traffic goes to the expected 1423 # backends. 1424 logger.info('Running test_path_matching') 1425 1426 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 1427 gcp, original_backend_service, instance_group, 1428 alternate_backend_service, same_zone_instance_group) 1429 1430 passed = True 1431 try: 1432 # A list of tuples (route_rules, expected_instances). 1433 test_cases = [ 1434 ( 1435 [{ 1436 'priority': 0, 1437 # FullPath EmptyCall -> alternate_backend_service. 1438 'matchRules': [{ 1439 'fullPathMatch': '/grpc.testing.TestService/EmptyCall' 1440 }], 1441 'service': alternate_backend_service.url 1442 }], 1443 { 1444 "EmptyCall": alternate_backend_instances, 1445 "UnaryCall": original_backend_instances 1446 }), 1447 ( 1448 [{ 1449 'priority': 0, 1450 # Prefix UnaryCall -> alternate_backend_service. 1451 'matchRules': [{ 1452 'prefixMatch': '/grpc.testing.TestService/Unary' 1453 }], 1454 'service': alternate_backend_service.url 1455 }], 1456 { 1457 "UnaryCall": alternate_backend_instances, 1458 "EmptyCall": original_backend_instances 1459 }), 1460 ( 1461 # This test case is similar to the one above (but with route 1462 # services swapped). This test has two routes (full_path and 1463 # the default) to match EmptyCall, and both routes set 1464 # alternative_backend_service as the action. This forces the 1465 # client to handle duplicate Clusters in the RDS response. 1466 [ 1467 { 1468 'priority': 0, 1469 # Prefix UnaryCall -> original_backend_service. 1470 'matchRules': [{ 1471 'prefixMatch': '/grpc.testing.TestService/Unary' 1472 }], 1473 'service': original_backend_service.url 1474 }, 1475 { 1476 'priority': 1, 1477 # FullPath EmptyCall -> alternate_backend_service. 1478 'matchRules': [{ 1479 'fullPathMatch': 1480 '/grpc.testing.TestService/EmptyCall' 1481 }], 1482 'service': alternate_backend_service.url 1483 } 1484 ], 1485 { 1486 "UnaryCall": original_backend_instances, 1487 "EmptyCall": alternate_backend_instances 1488 }), 1489 ( 1490 [{ 1491 'priority': 0, 1492 # Regex UnaryCall -> alternate_backend_service. 1493 'matchRules': [{ 1494 'regexMatch': 1495 '^\/.*\/UnaryCall$' # Unary methods with any services. 1496 }], 1497 'service': alternate_backend_service.url 1498 }], 1499 { 1500 "UnaryCall": alternate_backend_instances, 1501 "EmptyCall": original_backend_instances 1502 }), 1503 ( 1504 [{ 1505 'priority': 0, 1506 # ignoreCase EmptyCall -> alternate_backend_service. 1507 'matchRules': [{ 1508 # Case insensitive matching. 1509 'fullPathMatch': '/gRpC.tEsTinG.tEstseRvice/empTycaLl', 1510 'ignoreCase': True, 1511 }], 1512 'service': alternate_backend_service.url 1513 }], 1514 { 1515 "UnaryCall": original_backend_instances, 1516 "EmptyCall": alternate_backend_instances 1517 }), 1518 ] 1519 1520 for (route_rules, expected_instances) in test_cases: 1521 logger.info('patching url map with %s', route_rules) 1522 patch_url_map_backend_service(gcp, 1523 original_backend_service, 1524 route_rules=route_rules) 1525 1526 # Wait for traffic to go to both services. 1527 logger.info( 1528 'waiting for traffic to go to all backends (including alternate)' 1529 ) 1530 wait_until_all_rpcs_go_to_given_backends( 1531 original_backend_instances + alternate_backend_instances, 1532 _WAIT_FOR_STATS_SEC) 1533 1534 retry_count = 80 1535 # Each attempt takes about 5 seconds, 80 retries is equivalent to 400 1536 # seconds timeout. 1537 for i in range(retry_count): 1538 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1539 if not stats.rpcs_by_method: 1540 raise ValueError( 1541 'stats.rpcs_by_method is None, the interop client stats service does not support this test case' 1542 ) 1543 logger.info('attempt %d', i) 1544 if compare_expected_instances(stats, expected_instances): 1545 logger.info("success") 1546 break 1547 elif i == retry_count - 1: 1548 raise Exception( 1549 'timeout waiting for RPCs to the expected instances: %s' 1550 % expected_instances) 1551 except Exception: 1552 passed = False 1553 raise 1554 finally: 1555 if passed or not args.halt_after_fail: 1556 patch_url_map_backend_service(gcp, original_backend_service) 1557 patch_backend_service(gcp, alternate_backend_service, []) 1558 1559 1560def test_header_matching(gcp, original_backend_service, instance_group, 1561 alternate_backend_service, same_zone_instance_group): 1562 # This test start with all traffic (UnaryCall and EmptyCall) going to 1563 # original_backend_service. 1564 # 1565 # Then it updates URL-map to add routes, to make RPCs with test headers to 1566 # go to different backends. It waits for all backends in both services to 1567 # receive traffic, then verifies that traffic goes to the expected 1568 # backends. 1569 logger.info('Running test_header_matching') 1570 1571 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 1572 gcp, original_backend_service, instance_group, 1573 alternate_backend_service, same_zone_instance_group) 1574 1575 passed = True 1576 try: 1577 # A list of tuples (route_rules, expected_instances). 1578 test_cases = [ 1579 ( 1580 [{ 1581 'priority': 0, 1582 # Header ExactMatch -> alternate_backend_service. 1583 # EmptyCall is sent with the metadata. 1584 'matchRules': [{ 1585 'prefixMatch': 1586 '/', 1587 'headerMatches': [{ 1588 'headerName': _TEST_METADATA_KEY, 1589 'exactMatch': _TEST_METADATA_VALUE_EMPTY 1590 }] 1591 }], 1592 'service': alternate_backend_service.url 1593 }], 1594 { 1595 "EmptyCall": alternate_backend_instances, 1596 "UnaryCall": original_backend_instances 1597 }), 1598 ( 1599 [{ 1600 'priority': 0, 1601 # Header PrefixMatch -> alternate_backend_service. 1602 # UnaryCall is sent with the metadata. 1603 'matchRules': [{ 1604 'prefixMatch': 1605 '/', 1606 'headerMatches': [{ 1607 'headerName': _TEST_METADATA_KEY, 1608 'prefixMatch': _TEST_METADATA_VALUE_UNARY[:2] 1609 }] 1610 }], 1611 'service': alternate_backend_service.url 1612 }], 1613 { 1614 "EmptyCall": original_backend_instances, 1615 "UnaryCall": alternate_backend_instances 1616 }), 1617 ( 1618 [{ 1619 'priority': 0, 1620 # Header SuffixMatch -> alternate_backend_service. 1621 # EmptyCall is sent with the metadata. 1622 'matchRules': [{ 1623 'prefixMatch': 1624 '/', 1625 'headerMatches': [{ 1626 'headerName': _TEST_METADATA_KEY, 1627 'suffixMatch': _TEST_METADATA_VALUE_EMPTY[-2:] 1628 }] 1629 }], 1630 'service': alternate_backend_service.url 1631 }], 1632 { 1633 "EmptyCall": alternate_backend_instances, 1634 "UnaryCall": original_backend_instances 1635 }), 1636 ( 1637 [{ 1638 'priority': 0, 1639 # Header 'xds_md_numeric' present -> alternate_backend_service. 1640 # UnaryCall is sent with the metadata, so will be sent to alternative. 1641 'matchRules': [{ 1642 'prefixMatch': 1643 '/', 1644 'headerMatches': [{ 1645 'headerName': _TEST_METADATA_NUMERIC_KEY, 1646 'presentMatch': True 1647 }] 1648 }], 1649 'service': alternate_backend_service.url 1650 }], 1651 { 1652 "EmptyCall": original_backend_instances, 1653 "UnaryCall": alternate_backend_instances 1654 }), 1655 ( 1656 [{ 1657 'priority': 0, 1658 # Header invert ExactMatch -> alternate_backend_service. 1659 # UnaryCall is sent with the metadata, so will be sent to 1660 # original. EmptyCall will be sent to alternative. 1661 'matchRules': [{ 1662 'prefixMatch': 1663 '/', 1664 'headerMatches': [{ 1665 'headerName': _TEST_METADATA_KEY, 1666 'exactMatch': _TEST_METADATA_VALUE_UNARY, 1667 'invertMatch': True 1668 }] 1669 }], 1670 'service': alternate_backend_service.url 1671 }], 1672 { 1673 "EmptyCall": alternate_backend_instances, 1674 "UnaryCall": original_backend_instances 1675 }), 1676 ( 1677 [{ 1678 'priority': 0, 1679 # Header 'xds_md_numeric' range [100,200] -> alternate_backend_service. 1680 # UnaryCall is sent with the metadata in range. 1681 'matchRules': [{ 1682 'prefixMatch': 1683 '/', 1684 'headerMatches': [{ 1685 'headerName': _TEST_METADATA_NUMERIC_KEY, 1686 'rangeMatch': { 1687 'rangeStart': '100', 1688 'rangeEnd': '200' 1689 } 1690 }] 1691 }], 1692 'service': alternate_backend_service.url 1693 }], 1694 { 1695 "EmptyCall": original_backend_instances, 1696 "UnaryCall": alternate_backend_instances 1697 }), 1698 ( 1699 [{ 1700 'priority': 0, 1701 # Header RegexMatch -> alternate_backend_service. 1702 # EmptyCall is sent with the metadata. 1703 'matchRules': [{ 1704 'prefixMatch': 1705 '/', 1706 'headerMatches': [{ 1707 'headerName': 1708 _TEST_METADATA_KEY, 1709 'regexMatch': 1710 "^%s.*%s$" % (_TEST_METADATA_VALUE_EMPTY[:2], 1711 _TEST_METADATA_VALUE_EMPTY[-2:]) 1712 }] 1713 }], 1714 'service': alternate_backend_service.url 1715 }], 1716 { 1717 "EmptyCall": alternate_backend_instances, 1718 "UnaryCall": original_backend_instances 1719 }), 1720 ] 1721 1722 for (route_rules, expected_instances) in test_cases: 1723 logger.info('patching url map with %s -> alternative', 1724 route_rules[0]['matchRules']) 1725 patch_url_map_backend_service(gcp, 1726 original_backend_service, 1727 route_rules=route_rules) 1728 1729 # Wait for traffic to go to both services. 1730 logger.info( 1731 'waiting for traffic to go to all backends (including alternate)' 1732 ) 1733 wait_until_all_rpcs_go_to_given_backends( 1734 original_backend_instances + alternate_backend_instances, 1735 _WAIT_FOR_STATS_SEC) 1736 1737 retry_count = 80 1738 # Each attempt takes about 5 seconds, 80 retries is equivalent to 400 1739 # seconds timeout. 1740 for i in range(retry_count): 1741 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1742 if not stats.rpcs_by_method: 1743 raise ValueError( 1744 'stats.rpcs_by_method is None, the interop client stats service does not support this test case' 1745 ) 1746 logger.info('attempt %d', i) 1747 if compare_expected_instances(stats, expected_instances): 1748 logger.info("success") 1749 break 1750 elif i == retry_count - 1: 1751 raise Exception( 1752 'timeout waiting for RPCs to the expected instances: %s' 1753 % expected_instances) 1754 except Exception: 1755 passed = False 1756 raise 1757 finally: 1758 if passed or not args.halt_after_fail: 1759 patch_url_map_backend_service(gcp, original_backend_service) 1760 patch_backend_service(gcp, alternate_backend_service, []) 1761 1762 1763def test_circuit_breaking(gcp, original_backend_service, instance_group, 1764 same_zone_instance_group): 1765 ''' 1766 Since backend service circuit_breakers configuration cannot be unset, 1767 which causes trouble for restoring validate_for_proxy flag in target 1768 proxy/global forwarding rule. This test uses dedicated backend sevices. 1769 The url_map and backend services undergoes the following state changes: 1770 1771 Before test: 1772 original_backend_service -> [instance_group] 1773 extra_backend_service -> [] 1774 more_extra_backend_service -> [] 1775 1776 url_map -> [original_backend_service] 1777 1778 In test: 1779 extra_backend_service (with circuit_breakers) -> [instance_group] 1780 more_extra_backend_service (with circuit_breakers) -> [same_zone_instance_group] 1781 1782 url_map -> [extra_backend_service, more_extra_backend_service] 1783 1784 After test: 1785 original_backend_service -> [instance_group] 1786 extra_backend_service (with circuit_breakers) -> [] 1787 more_extra_backend_service (with circuit_breakers) -> [] 1788 1789 url_map -> [original_backend_service] 1790 ''' 1791 logger.info('Running test_circuit_breaking') 1792 additional_backend_services = [] 1793 passed = True 1794 try: 1795 # TODO(chengyuanzhang): Dedicated backend services created for circuit 1796 # breaking test. Once the issue for unsetting backend service circuit 1797 # breakers is resolved or configuring backend service circuit breakers is 1798 # enabled for config validation, these dedicated backend services can be 1799 # eliminated. 1800 extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix 1801 more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix 1802 extra_backend_service = add_backend_service(gcp, 1803 extra_backend_service_name) 1804 additional_backend_services.append(extra_backend_service) 1805 more_extra_backend_service = add_backend_service( 1806 gcp, more_extra_backend_service_name) 1807 additional_backend_services.append(more_extra_backend_service) 1808 # The config validation for proxyless doesn't allow setting 1809 # circuit_breakers. Disable validate validate_for_proxyless 1810 # for this test. This can be removed when validation 1811 # accepts circuit_breakers. 1812 logger.info('disabling validate_for_proxyless in target proxy') 1813 set_validate_for_proxyless(gcp, False) 1814 extra_backend_service_max_requests = 500 1815 more_extra_backend_service_max_requests = 1000 1816 patch_backend_service(gcp, 1817 extra_backend_service, [instance_group], 1818 circuit_breakers={ 1819 'maxRequests': 1820 extra_backend_service_max_requests 1821 }) 1822 logger.info('Waiting for extra backends to become healthy') 1823 wait_for_healthy_backends(gcp, extra_backend_service, instance_group) 1824 patch_backend_service(gcp, 1825 more_extra_backend_service, 1826 [same_zone_instance_group], 1827 circuit_breakers={ 1828 'maxRequests': 1829 more_extra_backend_service_max_requests 1830 }) 1831 logger.info('Waiting for more extra backend to become healthy') 1832 wait_for_healthy_backends(gcp, more_extra_backend_service, 1833 same_zone_instance_group) 1834 extra_backend_instances = get_instance_names(gcp, instance_group) 1835 more_extra_backend_instances = get_instance_names( 1836 gcp, same_zone_instance_group) 1837 route_rules = [ 1838 { 1839 'priority': 0, 1840 # UnaryCall -> extra_backend_service 1841 'matchRules': [{ 1842 'fullPathMatch': '/grpc.testing.TestService/UnaryCall' 1843 }], 1844 'service': extra_backend_service.url 1845 }, 1846 { 1847 'priority': 1, 1848 # EmptyCall -> more_extra_backend_service 1849 'matchRules': [{ 1850 'fullPathMatch': '/grpc.testing.TestService/EmptyCall' 1851 }], 1852 'service': more_extra_backend_service.url 1853 }, 1854 ] 1855 1856 # Make client send UNARY_CALL and EMPTY_CALL. 1857 configure_client([ 1858 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1859 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL 1860 ]) 1861 logger.info('Patching url map with %s', route_rules) 1862 patch_url_map_backend_service(gcp, 1863 extra_backend_service, 1864 route_rules=route_rules) 1865 logger.info('Waiting for traffic to go to all backends') 1866 wait_until_all_rpcs_go_to_given_backends( 1867 extra_backend_instances + more_extra_backend_instances, 1868 _WAIT_FOR_STATS_SEC) 1869 1870 # Make all calls keep-open. 1871 configure_client([ 1872 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1873 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL 1874 ], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1875 'rpc-behavior', 'keep-open'), 1876 (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 1877 'rpc-behavior', 'keep-open')]) 1878 wait_until_rpcs_in_flight( 1879 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC + 1880 int(extra_backend_service_max_requests / args.qps)), 1881 extra_backend_service_max_requests, 1) 1882 logger.info('UNARY_CALL reached stable state (%d)', 1883 extra_backend_service_max_requests) 1884 wait_until_rpcs_in_flight( 1885 'EMPTY_CALL', 1886 (_WAIT_FOR_BACKEND_SEC + 1887 int(more_extra_backend_service_max_requests / args.qps)), 1888 more_extra_backend_service_max_requests, 1) 1889 logger.info('EMPTY_CALL reached stable state (%d)', 1890 more_extra_backend_service_max_requests) 1891 1892 # Increment circuit breakers max_requests threshold. 1893 extra_backend_service_max_requests = 800 1894 patch_backend_service(gcp, 1895 extra_backend_service, [instance_group], 1896 circuit_breakers={ 1897 'maxRequests': 1898 extra_backend_service_max_requests 1899 }) 1900 wait_until_rpcs_in_flight( 1901 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC + 1902 int(extra_backend_service_max_requests / args.qps)), 1903 extra_backend_service_max_requests, 1) 1904 logger.info('UNARY_CALL reached stable state after increase (%d)', 1905 extra_backend_service_max_requests) 1906 logger.info('success') 1907 # Avoid new RPCs being outstanding (some test clients create threads 1908 # for sending RPCs) after restoring backend services. 1909 configure_client( 1910 [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL]) 1911 except Exception: 1912 passed = False 1913 raise 1914 finally: 1915 if passed or not args.halt_after_fail: 1916 patch_url_map_backend_service(gcp, original_backend_service) 1917 patch_backend_service(gcp, original_backend_service, 1918 [instance_group]) 1919 for backend_service in additional_backend_services: 1920 delete_backend_service(gcp, backend_service) 1921 set_validate_for_proxyless(gcp, True) 1922 1923 1924def test_timeout(gcp, original_backend_service, instance_group): 1925 logger.info('Running test_timeout') 1926 1927 logger.info('waiting for original backends to become healthy') 1928 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 1929 1930 # UnaryCall -> maxStreamDuration:3s 1931 route_rules = [{ 1932 'priority': 0, 1933 'matchRules': [{ 1934 'fullPathMatch': '/grpc.testing.TestService/UnaryCall' 1935 }], 1936 'service': original_backend_service.url, 1937 'routeAction': { 1938 'maxStreamDuration': { 1939 'seconds': 3, 1940 }, 1941 }, 1942 }] 1943 patch_url_map_backend_service(gcp, 1944 original_backend_service, 1945 route_rules=route_rules) 1946 # A list of tuples (testcase_name, {client_config}, {expected_results}) 1947 test_cases = [ 1948 ( 1949 'timeout_exceeded (UNARY_CALL), timeout_different_route (EMPTY_CALL)', 1950 # UnaryCall and EmptyCall both sleep-4. 1951 # UnaryCall timeouts, EmptyCall succeeds. 1952 { 1953 'rpc_types': [ 1954 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1955 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 1956 ], 1957 'metadata': [ 1958 (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1959 'rpc-behavior', 'sleep-4'), 1960 (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 1961 'rpc-behavior', 'sleep-4'), 1962 ], 1963 }, 1964 { 1965 'UNARY_CALL': 4, # DEADLINE_EXCEEDED 1966 'EMPTY_CALL': 0, 1967 }, 1968 ), 1969 ( 1970 'app_timeout_exceeded', 1971 # UnaryCall only with sleep-2; timeout=1s; calls timeout. 1972 { 1973 'rpc_types': [ 1974 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1975 ], 1976 'metadata': [ 1977 (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1978 'rpc-behavior', 'sleep-2'), 1979 ], 1980 'timeout_sec': 1, 1981 }, 1982 { 1983 'UNARY_CALL': 4, # DEADLINE_EXCEEDED 1984 }, 1985 ), 1986 ( 1987 'timeout_not_exceeded', 1988 # UnaryCall only with no sleep; calls succeed. 1989 { 1990 'rpc_types': [ 1991 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1992 ], 1993 }, 1994 { 1995 'UNARY_CALL': 0, 1996 }, 1997 ) 1998 ] 1999 2000 passed = True 2001 try: 2002 first_case = True 2003 for (testcase_name, client_config, expected_results) in test_cases: 2004 logger.info('starting case %s', testcase_name) 2005 configure_client(**client_config) 2006 # wait a second to help ensure the client stops sending RPCs with 2007 # the old config. We will make multiple attempts if it is failing, 2008 # but this improves confidence that the test is valid if the 2009 # previous client_config would lead to the same results. 2010 time.sleep(1) 2011 # Each attempt takes 10 seconds; 20 attempts is equivalent to 200 2012 # second timeout. 2013 attempt_count = 20 2014 if first_case: 2015 attempt_count = 120 2016 first_case = False 2017 before_stats = get_client_accumulated_stats() 2018 if not before_stats.stats_per_method: 2019 raise ValueError( 2020 'stats.stats_per_method is None, the interop client stats service does not support this test case' 2021 ) 2022 for i in range(attempt_count): 2023 logger.info('%s: attempt %d', testcase_name, i) 2024 2025 test_runtime_secs = 10 2026 time.sleep(test_runtime_secs) 2027 after_stats = get_client_accumulated_stats() 2028 2029 success = True 2030 for rpc, status in expected_results.items(): 2031 qty = (after_stats.stats_per_method[rpc].result[status] - 2032 before_stats.stats_per_method[rpc].result[status]) 2033 want = test_runtime_secs * args.qps 2034 # Allow 10% deviation from expectation to reduce flakiness 2035 if qty < (want * .9) or qty > (want * 1.1): 2036 logger.info('%s: failed due to %s[%s]: got %d want ~%d', 2037 testcase_name, rpc, status, qty, want) 2038 success = False 2039 if success: 2040 logger.info('success') 2041 break 2042 logger.info('%s attempt %d failed', testcase_name, i) 2043 before_stats = after_stats 2044 else: 2045 raise Exception( 2046 '%s: timeout waiting for expected results: %s; got %s' % 2047 (testcase_name, expected_results, 2048 after_stats.stats_per_method)) 2049 except Exception: 2050 passed = False 2051 raise 2052 finally: 2053 if passed or not args.halt_after_fail: 2054 patch_url_map_backend_service(gcp, original_backend_service) 2055 2056 2057def test_fault_injection(gcp, original_backend_service, instance_group): 2058 logger.info('Running test_fault_injection') 2059 2060 logger.info('waiting for original backends to become healthy') 2061 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 2062 2063 testcase_header = 'fi_testcase' 2064 2065 def _route(pri, name, fi_policy): 2066 return { 2067 'priority': pri, 2068 'matchRules': [{ 2069 'prefixMatch': 2070 '/', 2071 'headerMatches': [{ 2072 'headerName': testcase_header, 2073 'exactMatch': name, 2074 }], 2075 }], 2076 'service': original_backend_service.url, 2077 'routeAction': { 2078 'faultInjectionPolicy': fi_policy 2079 }, 2080 } 2081 2082 def _abort(pct): 2083 return { 2084 'abort': { 2085 'httpStatus': 401, 2086 'percentage': pct, 2087 } 2088 } 2089 2090 def _delay(pct): 2091 return { 2092 'delay': { 2093 'fixedDelay': { 2094 'seconds': '20' 2095 }, 2096 'percentage': pct, 2097 } 2098 } 2099 2100 zero_route = _abort(0) 2101 zero_route.update(_delay(0)) 2102 route_rules = [ 2103 _route(0, 'zero_percent_fault_injection', zero_route), 2104 _route(1, 'always_delay', _delay(100)), 2105 _route(2, 'always_abort', _abort(100)), 2106 _route(3, 'delay_half', _delay(50)), 2107 _route(4, 'abort_half', _abort(50)), 2108 { 2109 'priority': 5, 2110 'matchRules': [{ 2111 'prefixMatch': '/' 2112 }], 2113 'service': original_backend_service.url, 2114 }, 2115 ] 2116 set_validate_for_proxyless(gcp, False) 2117 patch_url_map_backend_service(gcp, 2118 original_backend_service, 2119 route_rules=route_rules) 2120 # A list of tuples (testcase_name, {client_config}, {code: percent}). Each 2121 # test case will set the testcase_header with the testcase_name for routing 2122 # to the appropriate config for the case, defined above. 2123 test_cases = [ 2124 ( 2125 'zero_percent_fault_injection', 2126 {}, 2127 { 2128 0: 1 2129 }, # OK 2130 ), 2131 ( 2132 'non_matching_fault_injection', # Not in route_rules, above. 2133 {}, 2134 { 2135 0: 1 2136 }, # OK 2137 ), 2138 ( 2139 'always_delay', 2140 { 2141 'timeout_sec': 2 2142 }, 2143 { 2144 4: 1 2145 }, # DEADLINE_EXCEEDED 2146 ), 2147 ( 2148 'always_abort', 2149 {}, 2150 { 2151 16: 1 2152 }, # UNAUTHENTICATED 2153 ), 2154 ( 2155 'delay_half', 2156 { 2157 'timeout_sec': 2 2158 }, 2159 { 2160 4: .5, 2161 0: .5 2162 }, # DEADLINE_EXCEEDED / OK: 50% / 50% 2163 ), 2164 ( 2165 'abort_half', 2166 {}, 2167 { 2168 16: .5, 2169 0: .5 2170 }, # UNAUTHENTICATED / OK: 50% / 50% 2171 ) 2172 ] 2173 2174 passed = True 2175 try: 2176 first_case = True 2177 for (testcase_name, client_config, expected_results) in test_cases: 2178 logger.info('starting case %s', testcase_name) 2179 2180 client_config['metadata'] = [ 2181 (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2182 testcase_header, testcase_name) 2183 ] 2184 client_config['rpc_types'] = [ 2185 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2186 ] 2187 configure_client(**client_config) 2188 # wait a second to help ensure the client stops sending RPCs with 2189 # the old config. We will make multiple attempts if it is failing, 2190 # but this improves confidence that the test is valid if the 2191 # previous client_config would lead to the same results. 2192 time.sleep(1) 2193 # Each attempt takes 10 seconds; 20 attempts is equivalent to 200 2194 # second timeout. 2195 attempt_count = 20 2196 if first_case: 2197 attempt_count = 120 2198 first_case = False 2199 before_stats = get_client_accumulated_stats() 2200 if not before_stats.stats_per_method: 2201 raise ValueError( 2202 'stats.stats_per_method is None, the interop client stats service does not support this test case' 2203 ) 2204 for i in range(attempt_count): 2205 logger.info('%s: attempt %d', testcase_name, i) 2206 2207 test_runtime_secs = 10 2208 time.sleep(test_runtime_secs) 2209 after_stats = get_client_accumulated_stats() 2210 2211 success = True 2212 for status, pct in expected_results.items(): 2213 rpc = 'UNARY_CALL' 2214 qty = (after_stats.stats_per_method[rpc].result[status] - 2215 before_stats.stats_per_method[rpc].result[status]) 2216 want = pct * args.qps * test_runtime_secs 2217 # Allow 10% deviation from expectation to reduce flakiness 2218 VARIANCE_ALLOWED = 0.1 2219 if abs(qty - want) > want * VARIANCE_ALLOWED: 2220 logger.info('%s: failed due to %s[%s]: got %d want ~%d', 2221 testcase_name, rpc, status, qty, want) 2222 success = False 2223 if success: 2224 logger.info('success') 2225 break 2226 logger.info('%s attempt %d failed', testcase_name, i) 2227 before_stats = after_stats 2228 else: 2229 raise Exception( 2230 '%s: timeout waiting for expected results: %s; got %s' % 2231 (testcase_name, expected_results, 2232 after_stats.stats_per_method)) 2233 except Exception: 2234 passed = False 2235 raise 2236 finally: 2237 if passed or not args.halt_after_fail: 2238 patch_url_map_backend_service(gcp, original_backend_service) 2239 set_validate_for_proxyless(gcp, True) 2240 2241 2242def test_csds(gcp, original_backend_service, instance_group, server_uri): 2243 test_csds_timeout_s = datetime.timedelta(minutes=5).total_seconds() 2244 sleep_interval_between_attempts_s = datetime.timedelta( 2245 seconds=2).total_seconds() 2246 logger.info('Running test_csds') 2247 2248 logger.info('waiting for original backends to become healthy') 2249 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 2250 2251 # Test case timeout: 5 minutes 2252 deadline = time.time() + test_csds_timeout_s 2253 cnt = 0 2254 while time.time() <= deadline: 2255 client_config = get_client_xds_config_dump() 2256 logger.info('test_csds attempt %d: received xDS config %s', cnt, 2257 json.dumps(client_config, indent=2)) 2258 if client_config is not None: 2259 # Got the xDS config dump, now validate it 2260 ok = True 2261 try: 2262 if client_config['node']['locality']['zone'] != args.zone: 2263 logger.info('Invalid zone %s != %s', 2264 client_config['node']['locality']['zone'], 2265 args.zone) 2266 ok = False 2267 seen = set() 2268 for xds_config in client_config.get('xds_config', []): 2269 if 'listener_config' in xds_config: 2270 listener_name = xds_config['listener_config'][ 2271 'dynamic_listeners'][0]['active_state']['listener'][ 2272 'name'] 2273 if listener_name != server_uri: 2274 logger.info('Invalid Listener name %s != %s', 2275 listener_name, server_uri) 2276 ok = False 2277 else: 2278 seen.add('lds') 2279 elif 'route_config' in xds_config: 2280 num_vh = len( 2281 xds_config['route_config']['dynamic_route_configs'] 2282 [0]['route_config']['virtual_hosts']) 2283 if num_vh <= 0: 2284 logger.info('Invalid number of VirtualHosts %s', 2285 num_vh) 2286 ok = False 2287 else: 2288 seen.add('rds') 2289 elif 'cluster_config' in xds_config: 2290 cluster_type = xds_config['cluster_config'][ 2291 'dynamic_active_clusters'][0]['cluster']['type'] 2292 if cluster_type != 'EDS': 2293 logger.info('Invalid cluster type %s != EDS', 2294 cluster_type) 2295 ok = False 2296 else: 2297 seen.add('cds') 2298 elif 'endpoint_config' in xds_config: 2299 sub_zone = xds_config["endpoint_config"][ 2300 "dynamic_endpoint_configs"][0]["endpoint_config"][ 2301 "endpoints"][0]["locality"]["sub_zone"] 2302 if args.zone not in sub_zone: 2303 logger.info('Invalid endpoint sub_zone %s', 2304 sub_zone) 2305 ok = False 2306 else: 2307 seen.add('eds') 2308 for generic_xds_config in client_config.get( 2309 'generic_xds_configs', []): 2310 if re.search(r'\.Listener$', 2311 generic_xds_config['type_url']): 2312 seen.add('lds') 2313 listener = generic_xds_config["xds_config"] 2314 if listener['name'] != server_uri: 2315 logger.info('Invalid Listener name %s != %s', 2316 listener_name, server_uri) 2317 ok = False 2318 elif re.search(r'\.RouteConfiguration$', 2319 generic_xds_config['type_url']): 2320 seen.add('rds') 2321 route_config = generic_xds_config["xds_config"] 2322 if not len(route_config['virtual_hosts']): 2323 logger.info('Invalid number of VirtualHosts %s', 2324 num_vh) 2325 ok = False 2326 elif re.search(r'\.Cluster$', 2327 generic_xds_config['type_url']): 2328 seen.add('cds') 2329 cluster = generic_xds_config["xds_config"] 2330 if cluster['type'] != 'EDS': 2331 logger.info('Invalid cluster type %s != EDS', 2332 cluster_type) 2333 ok = False 2334 elif re.search(r'\.ClusterLoadAssignment$', 2335 generic_xds_config['type_url']): 2336 seen.add('eds') 2337 endpoint = generic_xds_config["xds_config"] 2338 if args.zone not in endpoint["endpoints"][0][ 2339 "locality"]["sub_zone"]: 2340 logger.info('Invalid endpoint sub_zone %s', 2341 sub_zone) 2342 ok = False 2343 want = {'lds', 'rds', 'cds', 'eds'} 2344 if seen != want: 2345 logger.info('Incomplete xDS config dump, seen=%s', seen) 2346 ok = False 2347 except: 2348 logger.exception('Error in xDS config dump:') 2349 ok = False 2350 finally: 2351 if ok: 2352 # Successfully fetched xDS config, and they looks good. 2353 logger.info('success') 2354 return 2355 logger.info('test_csds attempt %d failed', cnt) 2356 # Give the client some time to fetch xDS resources 2357 time.sleep(sleep_interval_between_attempts_s) 2358 cnt += 1 2359 2360 raise RuntimeError('failed to receive a valid xDS config in %s seconds' % 2361 test_csds_timeout_s) 2362 2363 2364def set_validate_for_proxyless(gcp, validate_for_proxyless): 2365 if not gcp.alpha_compute: 2366 logger.debug( 2367 'Not setting validateForProxy because alpha is not enabled') 2368 return 2369 if len(gcp.global_forwarding_rules) != 1 or len( 2370 gcp.target_proxies) != 1 or len(gcp.url_maps) != 1: 2371 logger.debug( 2372 "Global forwarding rule, target proxy or url map not found.") 2373 return 2374 # This function deletes global_forwarding_rule and target_proxy, then 2375 # recreate target_proxy with validateForProxyless=False. This is necessary 2376 # because patching target_grpc_proxy isn't supported. 2377 delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0]) 2378 delete_target_proxy(gcp, gcp.target_proxies[0]) 2379 create_target_proxy(gcp, target_proxy_name, validate_for_proxyless) 2380 create_global_forwarding_rule(gcp, forwarding_rule_name, [gcp.service_port]) 2381 2382 2383def get_serving_status(instance, service_port): 2384 with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel: 2385 health_stub = health_pb2_grpc.HealthStub(channel) 2386 return health_stub.Check(health_pb2.HealthCheckRequest()) 2387 2388 2389def set_serving_status(instances, service_port, serving): 2390 logger.info('setting %s serving status to %s', instances, serving) 2391 for instance in instances: 2392 with grpc.insecure_channel('%s:%d' % 2393 (instance, service_port)) as channel: 2394 logger.info('setting %s serving status to %s', instance, serving) 2395 stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel) 2396 retry_count = 5 2397 for i in range(5): 2398 if serving: 2399 stub.SetServing(empty_pb2.Empty()) 2400 else: 2401 stub.SetNotServing(empty_pb2.Empty()) 2402 serving_status = get_serving_status(instance, service_port) 2403 logger.info('got instance service status %s', serving_status) 2404 want_status = health_pb2.HealthCheckResponse.SERVING if serving else health_pb2.HealthCheckResponse.NOT_SERVING 2405 if serving_status.status == want_status: 2406 break 2407 if i == retry_count - 1: 2408 raise Exception( 2409 'failed to set instance service status after %d retries' 2410 % retry_count) 2411 2412 2413def is_primary_instance_group(gcp, instance_group): 2414 # Clients may connect to a TD instance in a different region than the 2415 # client, in which case primary/secondary assignments may not be based on 2416 # the client's actual locality. 2417 instance_names = get_instance_names(gcp, instance_group) 2418 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 2419 return all(peer in instance_names for peer in stats.rpcs_by_peer.keys()) 2420 2421 2422def get_startup_script(path_to_server_binary, service_port): 2423 if path_to_server_binary: 2424 return 'nohup %s --port=%d 1>/dev/null &' % (path_to_server_binary, 2425 service_port) 2426 else: 2427 return """#!/bin/bash 2428sudo apt update 2429sudo apt install -y git default-jdk 2430mkdir java_server 2431pushd java_server 2432git clone https://github.com/grpc/grpc-java.git 2433pushd grpc-java 2434pushd interop-testing 2435../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true 2436 2437nohup build/install/grpc-interop-testing/bin/xds-test-server \ 2438 --port=%d 1>/dev/null &""" % service_port 2439 2440 2441def create_instance_template(gcp, name, network, source_image, machine_type, 2442 startup_script): 2443 config = { 2444 'name': name, 2445 'properties': { 2446 'tags': { 2447 'items': ['allow-health-checks'] 2448 }, 2449 'machineType': machine_type, 2450 'serviceAccounts': [{ 2451 'email': 'default', 2452 'scopes': ['https://www.googleapis.com/auth/cloud-platform',] 2453 }], 2454 'networkInterfaces': [{ 2455 'accessConfigs': [{ 2456 'type': 'ONE_TO_ONE_NAT' 2457 }], 2458 'network': network 2459 }], 2460 'disks': [{ 2461 'boot': True, 2462 'initializeParams': { 2463 'sourceImage': source_image 2464 }, 2465 'autoDelete': True 2466 }], 2467 'metadata': { 2468 'items': [{ 2469 'key': 'startup-script', 2470 'value': startup_script 2471 }] 2472 } 2473 } 2474 } 2475 2476 logger.debug('Sending GCP request with body=%s', config) 2477 result = gcp.compute.instanceTemplates().insert( 2478 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2479 wait_for_global_operation(gcp, result['name']) 2480 gcp.instance_template = GcpResource(config['name'], result['targetLink']) 2481 2482 2483def add_instance_group(gcp, zone, name, size): 2484 config = { 2485 'name': name, 2486 'instanceTemplate': gcp.instance_template.url, 2487 'targetSize': size, 2488 'namedPorts': [{ 2489 'name': 'grpc', 2490 'port': gcp.service_port 2491 }] 2492 } 2493 2494 logger.debug('Sending GCP request with body=%s', config) 2495 result = gcp.compute.instanceGroupManagers().insert( 2496 project=gcp.project, zone=zone, 2497 body=config).execute(num_retries=_GCP_API_RETRIES) 2498 wait_for_zone_operation(gcp, zone, result['name']) 2499 result = gcp.compute.instanceGroupManagers().get( 2500 project=gcp.project, zone=zone, 2501 instanceGroupManager=config['name']).execute( 2502 num_retries=_GCP_API_RETRIES) 2503 instance_group = InstanceGroup(config['name'], result['instanceGroup'], 2504 zone) 2505 gcp.instance_groups.append(instance_group) 2506 wait_for_instance_group_to_reach_expected_size(gcp, instance_group, size, 2507 _WAIT_FOR_OPERATION_SEC) 2508 return instance_group 2509 2510 2511def create_health_check(gcp, name): 2512 if gcp.alpha_compute: 2513 config = { 2514 'name': name, 2515 'type': 'GRPC', 2516 'grpcHealthCheck': { 2517 'portSpecification': 'USE_SERVING_PORT' 2518 } 2519 } 2520 compute_to_use = gcp.alpha_compute 2521 else: 2522 config = { 2523 'name': name, 2524 'type': 'TCP', 2525 'tcpHealthCheck': { 2526 'portName': 'grpc' 2527 } 2528 } 2529 compute_to_use = gcp.compute 2530 logger.debug('Sending GCP request with body=%s', config) 2531 result = compute_to_use.healthChecks().insert( 2532 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2533 wait_for_global_operation(gcp, result['name']) 2534 gcp.health_check = GcpResource(config['name'], result['targetLink']) 2535 2536 2537def create_health_check_firewall_rule(gcp, name): 2538 config = { 2539 'name': name, 2540 'direction': 'INGRESS', 2541 'allowed': [{ 2542 'IPProtocol': 'tcp' 2543 }], 2544 'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'], 2545 'targetTags': ['allow-health-checks'], 2546 } 2547 logger.debug('Sending GCP request with body=%s', config) 2548 result = gcp.compute.firewalls().insert( 2549 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2550 wait_for_global_operation(gcp, result['name']) 2551 gcp.health_check_firewall_rule = GcpResource(config['name'], 2552 result['targetLink']) 2553 2554 2555def add_backend_service(gcp, name): 2556 if gcp.alpha_compute: 2557 protocol = 'GRPC' 2558 compute_to_use = gcp.alpha_compute 2559 else: 2560 protocol = 'HTTP2' 2561 compute_to_use = gcp.compute 2562 config = { 2563 'name': name, 2564 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', 2565 'healthChecks': [gcp.health_check.url], 2566 'portName': 'grpc', 2567 'protocol': protocol 2568 } 2569 logger.debug('Sending GCP request with body=%s', config) 2570 result = compute_to_use.backendServices().insert( 2571 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2572 wait_for_global_operation(gcp, result['name']) 2573 backend_service = GcpResource(config['name'], result['targetLink']) 2574 gcp.backend_services.append(backend_service) 2575 return backend_service 2576 2577 2578def create_url_map(gcp, name, backend_service, host_name): 2579 config = { 2580 'name': name, 2581 'defaultService': backend_service.url, 2582 'pathMatchers': [{ 2583 'name': _PATH_MATCHER_NAME, 2584 'defaultService': backend_service.url, 2585 }], 2586 'hostRules': [{ 2587 'hosts': [host_name], 2588 'pathMatcher': _PATH_MATCHER_NAME 2589 }] 2590 } 2591 logger.debug('Sending GCP request with body=%s', config) 2592 result = gcp.compute.urlMaps().insert( 2593 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2594 wait_for_global_operation(gcp, result['name']) 2595 url_map = GcpResource(config['name'], result['targetLink']) 2596 gcp.url_maps.append(url_map) 2597 return url_map 2598 2599 2600def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name): 2601 config = { 2602 'hostRules': [{ 2603 'hosts': ['%s:%d' % (host_name, gcp.service_port)], 2604 'pathMatcher': _PATH_MATCHER_NAME 2605 }] 2606 } 2607 logger.debug('Sending GCP request with body=%s', config) 2608 result = gcp.compute.urlMaps().patch( 2609 project=gcp.project, urlMap=name, 2610 body=config).execute(num_retries=_GCP_API_RETRIES) 2611 wait_for_global_operation(gcp, result['name']) 2612 2613 2614def create_target_proxy(gcp, name, validate_for_proxyless=True, url_map=None): 2615 if url_map: 2616 arg_url_map_url = url_map.url 2617 else: 2618 arg_url_map_url = gcp.url_maps[0].url 2619 if gcp.alpha_compute: 2620 config = { 2621 'name': name, 2622 'url_map': arg_url_map_url, 2623 'validate_for_proxyless': validate_for_proxyless 2624 } 2625 logger.debug('Sending GCP request with body=%s', config) 2626 result = gcp.alpha_compute.targetGrpcProxies().insert( 2627 project=gcp.project, 2628 body=config).execute(num_retries=_GCP_API_RETRIES) 2629 else: 2630 config = { 2631 'name': name, 2632 'url_map': arg_url_map_url, 2633 } 2634 logger.debug('Sending GCP request with body=%s', config) 2635 result = gcp.compute.targetHttpProxies().insert( 2636 project=gcp.project, 2637 body=config).execute(num_retries=_GCP_API_RETRIES) 2638 wait_for_global_operation(gcp, result['name']) 2639 target_proxy = GcpResource(config['name'], result['targetLink']) 2640 gcp.target_proxies.append(target_proxy) 2641 return target_proxy 2642 2643 2644def create_global_forwarding_rule(gcp, 2645 name, 2646 potential_ports, 2647 potential_ip_addresses=['0.0.0.0'], 2648 target_proxy=None): 2649 if target_proxy: 2650 arg_target_proxy_url = target_proxy.url 2651 else: 2652 arg_target_proxy_url = gcp.target_proxies[0].url 2653 if gcp.alpha_compute: 2654 compute_to_use = gcp.alpha_compute 2655 else: 2656 compute_to_use = gcp.compute 2657 for port in potential_ports: 2658 for ip_address in potential_ip_addresses: 2659 try: 2660 config = { 2661 'name': name, 2662 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', 2663 'portRange': str(port), 2664 'IPAddress': ip_address, 2665 'network': args.network, 2666 'target': arg_target_proxy_url, 2667 } 2668 logger.debug('Sending GCP request with body=%s', config) 2669 result = compute_to_use.globalForwardingRules().insert( 2670 project=gcp.project, 2671 body=config).execute(num_retries=_GCP_API_RETRIES) 2672 wait_for_global_operation(gcp, result['name']) 2673 global_forwarding_rule = GcpResource(config['name'], 2674 result['targetLink']) 2675 gcp.global_forwarding_rules.append(global_forwarding_rule) 2676 gcp.service_port = port 2677 return 2678 except googleapiclient.errors.HttpError as http_error: 2679 logger.warning( 2680 'Got error %s when attempting to create forwarding rule to ' 2681 '%s:%d. Retrying with another port.' % 2682 (http_error, ip_address, port)) 2683 2684 2685def get_health_check(gcp, health_check_name): 2686 try: 2687 result = gcp.compute.healthChecks().get( 2688 project=gcp.project, healthCheck=health_check_name).execute() 2689 gcp.health_check = GcpResource(health_check_name, result['selfLink']) 2690 except Exception as e: 2691 gcp.errors.append(e) 2692 gcp.health_check = GcpResource(health_check_name, None) 2693 2694 2695def get_health_check_firewall_rule(gcp, firewall_name): 2696 try: 2697 result = gcp.compute.firewalls().get(project=gcp.project, 2698 firewall=firewall_name).execute() 2699 gcp.health_check_firewall_rule = GcpResource(firewall_name, 2700 result['selfLink']) 2701 except Exception as e: 2702 gcp.errors.append(e) 2703 gcp.health_check_firewall_rule = GcpResource(firewall_name, None) 2704 2705 2706def get_backend_service(gcp, backend_service_name, record_error=True): 2707 try: 2708 result = gcp.compute.backendServices().get( 2709 project=gcp.project, backendService=backend_service_name).execute() 2710 backend_service = GcpResource(backend_service_name, result['selfLink']) 2711 except Exception as e: 2712 if record_error: 2713 gcp.errors.append(e) 2714 backend_service = GcpResource(backend_service_name, None) 2715 gcp.backend_services.append(backend_service) 2716 return backend_service 2717 2718 2719def get_url_map(gcp, url_map_name, record_error=True): 2720 try: 2721 result = gcp.compute.urlMaps().get(project=gcp.project, 2722 urlMap=url_map_name).execute() 2723 url_map = GcpResource(url_map_name, result['selfLink']) 2724 gcp.url_maps.append(url_map) 2725 except Exception as e: 2726 if record_error: 2727 gcp.errors.append(e) 2728 2729 2730def get_target_proxy(gcp, target_proxy_name, record_error=True): 2731 try: 2732 if gcp.alpha_compute: 2733 result = gcp.alpha_compute.targetGrpcProxies().get( 2734 project=gcp.project, 2735 targetGrpcProxy=target_proxy_name).execute() 2736 else: 2737 result = gcp.compute.targetHttpProxies().get( 2738 project=gcp.project, 2739 targetHttpProxy=target_proxy_name).execute() 2740 target_proxy = GcpResource(target_proxy_name, result['selfLink']) 2741 gcp.target_proxies.append(target_proxy) 2742 except Exception as e: 2743 if record_error: 2744 gcp.errors.append(e) 2745 2746 2747def get_global_forwarding_rule(gcp, forwarding_rule_name, record_error=True): 2748 try: 2749 result = gcp.compute.globalForwardingRules().get( 2750 project=gcp.project, forwardingRule=forwarding_rule_name).execute() 2751 global_forwarding_rule = GcpResource(forwarding_rule_name, 2752 result['selfLink']) 2753 gcp.global_forwarding_rules.append(global_forwarding_rule) 2754 except Exception as e: 2755 if record_error: 2756 gcp.errors.append(e) 2757 2758 2759def get_instance_template(gcp, template_name): 2760 try: 2761 result = gcp.compute.instanceTemplates().get( 2762 project=gcp.project, instanceTemplate=template_name).execute() 2763 gcp.instance_template = GcpResource(template_name, result['selfLink']) 2764 except Exception as e: 2765 gcp.errors.append(e) 2766 gcp.instance_template = GcpResource(template_name, None) 2767 2768 2769def get_instance_group(gcp, zone, instance_group_name): 2770 try: 2771 result = gcp.compute.instanceGroups().get( 2772 project=gcp.project, zone=zone, 2773 instanceGroup=instance_group_name).execute() 2774 gcp.service_port = result['namedPorts'][0]['port'] 2775 instance_group = InstanceGroup(instance_group_name, result['selfLink'], 2776 zone) 2777 except Exception as e: 2778 gcp.errors.append(e) 2779 instance_group = InstanceGroup(instance_group_name, None, zone) 2780 gcp.instance_groups.append(instance_group) 2781 return instance_group 2782 2783 2784def delete_global_forwarding_rule(gcp, forwarding_rule_to_delete=None): 2785 if not forwarding_rule_to_delete: 2786 return 2787 try: 2788 logger.debug('Deleting forwarding rule %s', 2789 forwarding_rule_to_delete.name) 2790 result = gcp.compute.globalForwardingRules().delete( 2791 project=gcp.project, 2792 forwardingRule=forwarding_rule_to_delete.name).execute( 2793 num_retries=_GCP_API_RETRIES) 2794 wait_for_global_operation(gcp, result['name']) 2795 if forwarding_rule_to_delete in gcp.global_forwarding_rules: 2796 gcp.global_forwarding_rules.remove(forwarding_rule_to_delete) 2797 else: 2798 logger.debug( 2799 'Forwarding rule %s does not exist in gcp.global_forwarding_rules', 2800 forwarding_rule_to_delete.name) 2801 except googleapiclient.errors.HttpError as http_error: 2802 logger.info('Delete failed: %s', http_error) 2803 2804 2805def delete_global_forwarding_rules(gcp): 2806 forwarding_rules_to_delete = gcp.global_forwarding_rules.copy() 2807 for forwarding_rule in forwarding_rules_to_delete: 2808 delete_global_forwarding_rule(gcp, forwarding_rule) 2809 2810 2811def delete_target_proxy(gcp, proxy_to_delete=None): 2812 if not proxy_to_delete: 2813 return 2814 try: 2815 if gcp.alpha_compute: 2816 logger.debug('Deleting grpc proxy %s', proxy_to_delete.name) 2817 result = gcp.alpha_compute.targetGrpcProxies().delete( 2818 project=gcp.project, 2819 targetGrpcProxy=proxy_to_delete.name).execute( 2820 num_retries=_GCP_API_RETRIES) 2821 else: 2822 logger.debug('Deleting http proxy %s', proxy_to_delete.name) 2823 result = gcp.compute.targetHttpProxies().delete( 2824 project=gcp.project, 2825 targetHttpProxy=proxy_to_delete.name).execute( 2826 num_retries=_GCP_API_RETRIES) 2827 wait_for_global_operation(gcp, result['name']) 2828 if proxy_to_delete in gcp.target_proxies: 2829 gcp.target_proxies.remove(proxy_to_delete) 2830 else: 2831 logger.debug('Gcp proxy %s does not exist in gcp.target_proxies', 2832 proxy_to_delete.name) 2833 except googleapiclient.errors.HttpError as http_error: 2834 logger.info('Delete failed: %s', http_error) 2835 2836 2837def delete_target_proxies(gcp): 2838 target_proxies_to_delete = gcp.target_proxies.copy() 2839 for target_proxy in target_proxies_to_delete: 2840 delete_target_proxy(gcp, target_proxy) 2841 2842 2843def delete_url_map(gcp, url_map_to_delete=None): 2844 if not url_map_to_delete: 2845 return 2846 try: 2847 logger.debug('Deleting url map %s', url_map_to_delete.name) 2848 result = gcp.compute.urlMaps().delete( 2849 project=gcp.project, 2850 urlMap=url_map_to_delete.name).execute(num_retries=_GCP_API_RETRIES) 2851 wait_for_global_operation(gcp, result['name']) 2852 if url_map_to_delete in gcp.url_maps: 2853 gcp.url_maps.remove(url_map_to_delete) 2854 else: 2855 logger.debug('Url map %s does not exist in gcp.url_maps', 2856 url_map_to_delete.name) 2857 except googleapiclient.errors.HttpError as http_error: 2858 logger.info('Delete failed: %s', http_error) 2859 2860 2861def delete_url_maps(gcp): 2862 url_maps_to_delete = gcp.url_maps.copy() 2863 for url_map in url_maps_to_delete: 2864 delete_url_map(gcp, url_map) 2865 2866 2867def delete_backend_service(gcp, backend_service): 2868 try: 2869 logger.debug('Deleting backend service %s', backend_service.name) 2870 result = gcp.compute.backendServices().delete( 2871 project=gcp.project, backendService=backend_service.name).execute( 2872 num_retries=_GCP_API_RETRIES) 2873 wait_for_global_operation(gcp, result['name']) 2874 except googleapiclient.errors.HttpError as http_error: 2875 logger.info('Delete failed: %s', http_error) 2876 2877 2878def delete_backend_services(gcp): 2879 for backend_service in gcp.backend_services: 2880 delete_backend_service(gcp, backend_service) 2881 2882 2883def delete_firewall(gcp): 2884 try: 2885 logger.debug('Deleting firewall %s', 2886 gcp.health_check_firewall_rule.name) 2887 result = gcp.compute.firewalls().delete( 2888 project=gcp.project, 2889 firewall=gcp.health_check_firewall_rule.name).execute( 2890 num_retries=_GCP_API_RETRIES) 2891 wait_for_global_operation(gcp, result['name']) 2892 except googleapiclient.errors.HttpError as http_error: 2893 logger.info('Delete failed: %s', http_error) 2894 2895 2896def delete_health_check(gcp): 2897 try: 2898 logger.debug('Deleting health check %s', gcp.health_check.name) 2899 result = gcp.compute.healthChecks().delete( 2900 project=gcp.project, healthCheck=gcp.health_check.name).execute( 2901 num_retries=_GCP_API_RETRIES) 2902 wait_for_global_operation(gcp, result['name']) 2903 except googleapiclient.errors.HttpError as http_error: 2904 logger.info('Delete failed: %s', http_error) 2905 2906 2907def delete_instance_groups(gcp): 2908 for instance_group in gcp.instance_groups: 2909 try: 2910 logger.debug('Deleting instance group %s %s', instance_group.name, 2911 instance_group.zone) 2912 result = gcp.compute.instanceGroupManagers().delete( 2913 project=gcp.project, 2914 zone=instance_group.zone, 2915 instanceGroupManager=instance_group.name).execute( 2916 num_retries=_GCP_API_RETRIES) 2917 wait_for_zone_operation(gcp, 2918 instance_group.zone, 2919 result['name'], 2920 timeout_sec=_WAIT_FOR_BACKEND_SEC) 2921 except googleapiclient.errors.HttpError as http_error: 2922 logger.info('Delete failed: %s', http_error) 2923 2924 2925def delete_instance_template(gcp): 2926 try: 2927 logger.debug('Deleting instance template %s', 2928 gcp.instance_template.name) 2929 result = gcp.compute.instanceTemplates().delete( 2930 project=gcp.project, 2931 instanceTemplate=gcp.instance_template.name).execute( 2932 num_retries=_GCP_API_RETRIES) 2933 wait_for_global_operation(gcp, result['name']) 2934 except googleapiclient.errors.HttpError as http_error: 2935 logger.info('Delete failed: %s', http_error) 2936 2937 2938def patch_backend_service(gcp, 2939 backend_service, 2940 instance_groups, 2941 balancing_mode='UTILIZATION', 2942 max_rate=1, 2943 circuit_breakers=None): 2944 if gcp.alpha_compute: 2945 compute_to_use = gcp.alpha_compute 2946 else: 2947 compute_to_use = gcp.compute 2948 config = { 2949 'backends': [{ 2950 'group': instance_group.url, 2951 'balancingMode': balancing_mode, 2952 'maxRate': max_rate if balancing_mode == 'RATE' else None 2953 } for instance_group in instance_groups], 2954 'circuitBreakers': circuit_breakers, 2955 } 2956 logger.debug('Sending GCP request with body=%s', config) 2957 result = compute_to_use.backendServices().patch( 2958 project=gcp.project, backendService=backend_service.name, 2959 body=config).execute(num_retries=_GCP_API_RETRIES) 2960 wait_for_global_operation(gcp, 2961 result['name'], 2962 timeout_sec=_WAIT_FOR_BACKEND_SEC) 2963 2964 2965def resize_instance_group(gcp, 2966 instance_group, 2967 new_size, 2968 timeout_sec=_WAIT_FOR_OPERATION_SEC): 2969 result = gcp.compute.instanceGroupManagers().resize( 2970 project=gcp.project, 2971 zone=instance_group.zone, 2972 instanceGroupManager=instance_group.name, 2973 size=new_size).execute(num_retries=_GCP_API_RETRIES) 2974 wait_for_zone_operation(gcp, 2975 instance_group.zone, 2976 result['name'], 2977 timeout_sec=360) 2978 wait_for_instance_group_to_reach_expected_size(gcp, instance_group, 2979 new_size, timeout_sec) 2980 2981 2982def patch_url_map_backend_service(gcp, 2983 backend_service=None, 2984 services_with_weights=None, 2985 route_rules=None, 2986 url_map=None): 2987 if url_map: 2988 url_map_name = url_map.name 2989 else: 2990 url_map_name = gcp.url_maps[0].name 2991 '''change url_map's backend service 2992 2993 Only one of backend_service and service_with_weights can be not None. 2994 ''' 2995 if gcp.alpha_compute: 2996 compute_to_use = gcp.alpha_compute 2997 else: 2998 compute_to_use = gcp.compute 2999 3000 if backend_service and services_with_weights: 3001 raise ValueError( 3002 'both backend_service and service_with_weights are not None.') 3003 3004 default_service = backend_service.url if backend_service else None 3005 default_route_action = { 3006 'weightedBackendServices': [{ 3007 'backendService': service.url, 3008 'weight': w, 3009 } for service, w in services_with_weights.items()] 3010 } if services_with_weights else None 3011 3012 config = { 3013 'pathMatchers': [{ 3014 'name': _PATH_MATCHER_NAME, 3015 'defaultService': default_service, 3016 'defaultRouteAction': default_route_action, 3017 'routeRules': route_rules, 3018 }] 3019 } 3020 logger.debug('Sending GCP request with body=%s', config) 3021 result = compute_to_use.urlMaps().patch( 3022 project=gcp.project, urlMap=url_map_name, 3023 body=config).execute(num_retries=_GCP_API_RETRIES) 3024 wait_for_global_operation(gcp, result['name']) 3025 3026 3027def wait_for_instance_group_to_reach_expected_size(gcp, instance_group, 3028 expected_size, timeout_sec): 3029 start_time = time.time() 3030 while True: 3031 current_size = len(get_instance_names(gcp, instance_group)) 3032 if current_size == expected_size: 3033 break 3034 if time.time() - start_time > timeout_sec: 3035 raise Exception( 3036 'Instance group had expected size %d but actual size %d' % 3037 (expected_size, current_size)) 3038 time.sleep(2) 3039 3040 3041def wait_for_global_operation(gcp, 3042 operation, 3043 timeout_sec=_WAIT_FOR_OPERATION_SEC): 3044 start_time = time.time() 3045 while time.time() - start_time <= timeout_sec: 3046 result = gcp.compute.globalOperations().get( 3047 project=gcp.project, 3048 operation=operation).execute(num_retries=_GCP_API_RETRIES) 3049 if result['status'] == 'DONE': 3050 if 'error' in result: 3051 raise Exception(result['error']) 3052 return 3053 time.sleep(2) 3054 raise Exception('Operation %s did not complete within %d' % 3055 (operation, timeout_sec)) 3056 3057 3058def wait_for_zone_operation(gcp, 3059 zone, 3060 operation, 3061 timeout_sec=_WAIT_FOR_OPERATION_SEC): 3062 start_time = time.time() 3063 while time.time() - start_time <= timeout_sec: 3064 result = gcp.compute.zoneOperations().get( 3065 project=gcp.project, zone=zone, 3066 operation=operation).execute(num_retries=_GCP_API_RETRIES) 3067 if result['status'] == 'DONE': 3068 if 'error' in result: 3069 raise Exception(result['error']) 3070 return 3071 time.sleep(2) 3072 raise Exception('Operation %s did not complete within %d' % 3073 (operation, timeout_sec)) 3074 3075 3076def wait_for_healthy_backends(gcp, 3077 backend_service, 3078 instance_group, 3079 timeout_sec=_WAIT_FOR_BACKEND_SEC): 3080 start_time = time.time() 3081 config = {'group': instance_group.url} 3082 instance_names = get_instance_names(gcp, instance_group) 3083 expected_size = len(instance_names) 3084 while time.time() - start_time <= timeout_sec: 3085 for instance_name in instance_names: 3086 try: 3087 status = get_serving_status(instance_name, gcp.service_port) 3088 logger.info('serving status response from %s: %s', 3089 instance_name, status) 3090 except grpc.RpcError as rpc_error: 3091 logger.info('checking serving status of %s failed: %s', 3092 instance_name, rpc_error) 3093 result = gcp.compute.backendServices().getHealth( 3094 project=gcp.project, 3095 backendService=backend_service.name, 3096 body=config).execute(num_retries=_GCP_API_RETRIES) 3097 if 'healthStatus' in result: 3098 logger.info('received GCP healthStatus: %s', result['healthStatus']) 3099 healthy = True 3100 for instance in result['healthStatus']: 3101 if instance['healthState'] != 'HEALTHY': 3102 healthy = False 3103 break 3104 if healthy and expected_size == len(result['healthStatus']): 3105 return 3106 else: 3107 logger.info('no healthStatus received from GCP') 3108 time.sleep(5) 3109 raise Exception('Not all backends became healthy within %d seconds: %s' % 3110 (timeout_sec, result)) 3111 3112 3113def get_instance_names(gcp, instance_group): 3114 instance_names = [] 3115 result = gcp.compute.instanceGroups().listInstances( 3116 project=gcp.project, 3117 zone=instance_group.zone, 3118 instanceGroup=instance_group.name, 3119 body={ 3120 'instanceState': 'ALL' 3121 }).execute(num_retries=_GCP_API_RETRIES) 3122 if 'items' not in result: 3123 return [] 3124 for item in result['items']: 3125 # listInstances() returns the full URL of the instance, which ends with 3126 # the instance name. compute.instances().get() requires using the 3127 # instance name (not the full URL) to look up instance details, so we 3128 # just extract the name manually. 3129 instance_name = item['instance'].split('/')[-1] 3130 instance_names.append(instance_name) 3131 logger.info('retrieved instance names: %s', instance_names) 3132 return instance_names 3133 3134 3135def clean_up(gcp): 3136 delete_global_forwarding_rules(gcp) 3137 delete_target_proxies(gcp) 3138 delete_url_maps(gcp) 3139 delete_backend_services(gcp) 3140 if gcp.health_check_firewall_rule: 3141 delete_firewall(gcp) 3142 if gcp.health_check: 3143 delete_health_check(gcp) 3144 delete_instance_groups(gcp) 3145 if gcp.instance_template: 3146 delete_instance_template(gcp) 3147 3148 3149class InstanceGroup(object): 3150 3151 def __init__(self, name, url, zone): 3152 self.name = name 3153 self.url = url 3154 self.zone = zone 3155 3156 3157class GcpResource(object): 3158 3159 def __init__(self, name, url): 3160 self.name = name 3161 self.url = url 3162 3163 3164class GcpState(object): 3165 3166 def __init__(self, compute, alpha_compute, project, project_num): 3167 self.compute = compute 3168 self.alpha_compute = alpha_compute 3169 self.project = project 3170 self.project_num = project_num 3171 self.health_check = None 3172 self.health_check_firewall_rule = None 3173 self.backend_services = [] 3174 self.url_maps = [] 3175 self.target_proxies = [] 3176 self.global_forwarding_rules = [] 3177 self.service_port = None 3178 self.instance_template = None 3179 self.instance_groups = [] 3180 self.errors = [] 3181 3182 3183logging.debug( 3184 "script start time: %s", 3185 datetime.datetime.now( 3186 datetime.timezone.utc).astimezone().strftime("%Y-%m-%dT%H:%M:%S %Z")) 3187logging.debug("logging local timezone: %s", 3188 datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo) 3189alpha_compute = None 3190if args.compute_discovery_document: 3191 with open(args.compute_discovery_document, 'r') as discovery_doc: 3192 compute = googleapiclient.discovery.build_from_document( 3193 discovery_doc.read()) 3194 if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document: 3195 with open(args.alpha_compute_discovery_document, 'r') as discovery_doc: 3196 alpha_compute = googleapiclient.discovery.build_from_document( 3197 discovery_doc.read()) 3198else: 3199 compute = googleapiclient.discovery.build('compute', 'v1') 3200 if not args.only_stable_gcp_apis: 3201 alpha_compute = googleapiclient.discovery.build('compute', 'alpha') 3202 3203test_results = {} 3204failed_tests = [] 3205try: 3206 gcp = GcpState(compute, alpha_compute, args.project_id, args.project_num) 3207 gcp_suffix = args.gcp_suffix 3208 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix 3209 if not args.use_existing_gcp_resources: 3210 if args.keep_gcp_resources: 3211 # Auto-generating a unique suffix in case of conflict should not be 3212 # combined with --keep_gcp_resources, as the suffix actually used 3213 # for GCP resources will not match the provided --gcp_suffix value. 3214 num_attempts = 1 3215 else: 3216 num_attempts = 5 3217 for i in range(num_attempts): 3218 try: 3219 logger.info('Using GCP suffix %s', gcp_suffix) 3220 create_health_check(gcp, health_check_name) 3221 break 3222 except googleapiclient.errors.HttpError as http_error: 3223 gcp_suffix = '%s-%04d' % (gcp_suffix, random.randint(0, 9999)) 3224 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix 3225 logger.exception('HttpError when creating health check') 3226 if gcp.health_check is None: 3227 raise Exception('Failed to create health check name after %d ' 3228 'attempts' % num_attempts) 3229 firewall_name = _BASE_FIREWALL_RULE_NAME + gcp_suffix 3230 backend_service_name = _BASE_BACKEND_SERVICE_NAME + gcp_suffix 3231 alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + gcp_suffix 3232 extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix 3233 more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix 3234 url_map_name = _BASE_URL_MAP_NAME + gcp_suffix 3235 url_map_name_2 = url_map_name + '2' 3236 service_host_name = _BASE_SERVICE_HOST + gcp_suffix 3237 target_proxy_name = _BASE_TARGET_PROXY_NAME + gcp_suffix 3238 target_proxy_name_2 = target_proxy_name + '2' 3239 forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + gcp_suffix 3240 forwarding_rule_name_2 = forwarding_rule_name + '2' 3241 template_name = _BASE_TEMPLATE_NAME + gcp_suffix 3242 instance_group_name = _BASE_INSTANCE_GROUP_NAME + gcp_suffix 3243 same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + gcp_suffix 3244 secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + gcp_suffix 3245 potential_service_ports = list(args.service_port_range) 3246 random.shuffle(potential_service_ports) 3247 if args.use_existing_gcp_resources: 3248 logger.info('Reusing existing GCP resources') 3249 get_health_check(gcp, health_check_name) 3250 get_health_check_firewall_rule(gcp, firewall_name) 3251 backend_service = get_backend_service(gcp, backend_service_name) 3252 alternate_backend_service = get_backend_service( 3253 gcp, alternate_backend_service_name) 3254 extra_backend_service = get_backend_service(gcp, 3255 extra_backend_service_name, 3256 record_error=False) 3257 more_extra_backend_service = get_backend_service( 3258 gcp, more_extra_backend_service_name, record_error=False) 3259 get_url_map(gcp, url_map_name) 3260 get_target_proxy(gcp, target_proxy_name) 3261 get_global_forwarding_rule(gcp, forwarding_rule_name) 3262 get_url_map(gcp, url_map_name_2, record_error=False) 3263 get_target_proxy(gcp, target_proxy_name_2, record_error=False) 3264 get_global_forwarding_rule(gcp, 3265 forwarding_rule_name_2, 3266 record_error=False) 3267 get_instance_template(gcp, template_name) 3268 instance_group = get_instance_group(gcp, args.zone, instance_group_name) 3269 same_zone_instance_group = get_instance_group( 3270 gcp, args.zone, same_zone_instance_group_name) 3271 secondary_zone_instance_group = get_instance_group( 3272 gcp, args.secondary_zone, secondary_zone_instance_group_name) 3273 if gcp.errors: 3274 raise Exception(gcp.errors) 3275 else: 3276 create_health_check_firewall_rule(gcp, firewall_name) 3277 backend_service = add_backend_service(gcp, backend_service_name) 3278 alternate_backend_service = add_backend_service( 3279 gcp, alternate_backend_service_name) 3280 create_url_map(gcp, url_map_name, backend_service, service_host_name) 3281 create_target_proxy(gcp, target_proxy_name) 3282 create_global_forwarding_rule(gcp, forwarding_rule_name, 3283 potential_service_ports) 3284 if not gcp.service_port: 3285 raise Exception( 3286 'Failed to find a valid ip:port for the forwarding rule') 3287 if gcp.service_port != _DEFAULT_SERVICE_PORT: 3288 patch_url_map_host_rule_with_port(gcp, url_map_name, 3289 backend_service, 3290 service_host_name) 3291 startup_script = get_startup_script(args.path_to_server_binary, 3292 gcp.service_port) 3293 create_instance_template(gcp, template_name, args.network, 3294 args.source_image, args.machine_type, 3295 startup_script) 3296 instance_group = add_instance_group(gcp, args.zone, instance_group_name, 3297 _INSTANCE_GROUP_SIZE) 3298 patch_backend_service(gcp, backend_service, [instance_group]) 3299 same_zone_instance_group = add_instance_group( 3300 gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE) 3301 secondary_zone_instance_group = add_instance_group( 3302 gcp, args.secondary_zone, secondary_zone_instance_group_name, 3303 _INSTANCE_GROUP_SIZE) 3304 3305 wait_for_healthy_backends(gcp, backend_service, instance_group) 3306 3307 if args.test_case: 3308 client_env = dict(os.environ) 3309 if original_grpc_trace: 3310 client_env['GRPC_TRACE'] = original_grpc_trace 3311 if original_grpc_verbosity: 3312 client_env['GRPC_VERBOSITY'] = original_grpc_verbosity 3313 bootstrap_server_features = [] 3314 3315 if gcp.service_port == _DEFAULT_SERVICE_PORT: 3316 server_uri = service_host_name 3317 else: 3318 server_uri = service_host_name + ':' + str(gcp.service_port) 3319 if args.xds_v3_support: 3320 client_env['GRPC_XDS_EXPERIMENTAL_V3_SUPPORT'] = 'true' 3321 bootstrap_server_features.append('xds_v3') 3322 if args.bootstrap_file: 3323 bootstrap_path = os.path.abspath(args.bootstrap_file) 3324 else: 3325 with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file: 3326 bootstrap_file.write( 3327 _BOOTSTRAP_TEMPLATE.format( 3328 node_id='projects/%s/networks/%s/nodes/%s' % 3329 (gcp.project_num, args.network.split('/')[-1], 3330 uuid.uuid1()), 3331 server_features=json.dumps( 3332 bootstrap_server_features)).encode('utf-8')) 3333 bootstrap_path = bootstrap_file.name 3334 client_env['GRPC_XDS_BOOTSTRAP'] = bootstrap_path 3335 client_env['GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING'] = 'true' 3336 client_env['GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT'] = 'true' 3337 client_env['GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION'] = 'true' 3338 for test_case in args.test_case: 3339 if test_case in _V3_TEST_CASES and not args.xds_v3_support: 3340 logger.info('skipping test %s due to missing v3 support', 3341 test_case) 3342 continue 3343 if test_case in _ALPHA_TEST_CASES and not gcp.alpha_compute: 3344 logger.info('skipping test %s due to missing alpha support', 3345 test_case) 3346 continue 3347 if test_case in [ 3348 'api_listener', 'forwarding_rule_port_match', 3349 'forwarding_rule_default_port' 3350 ] and CLIENT_HOSTS: 3351 logger.info( 3352 'skipping test %s because test configuration is' 3353 'not compatible with client processes on existing' 3354 'client hosts', test_case) 3355 continue 3356 if test_case == 'forwarding_rule_default_port': 3357 server_uri = service_host_name 3358 result = jobset.JobResult() 3359 log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case) 3360 if not os.path.exists(log_dir): 3361 os.makedirs(log_dir) 3362 test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME) 3363 test_log_file = open(test_log_filename, 'w+') 3364 client_process = None 3365 3366 if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS: 3367 rpcs_to_send = '--rpc="UnaryCall,EmptyCall"' 3368 else: 3369 rpcs_to_send = '--rpc="UnaryCall"' 3370 3371 if test_case in _TESTS_TO_SEND_METADATA: 3372 metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU},UnaryCall:{keyNU}:{valueNU}"'.format( 3373 keyE=_TEST_METADATA_KEY, 3374 valueE=_TEST_METADATA_VALUE_EMPTY, 3375 keyU=_TEST_METADATA_KEY, 3376 valueU=_TEST_METADATA_VALUE_UNARY, 3377 keyNU=_TEST_METADATA_NUMERIC_KEY, 3378 valueNU=_TEST_METADATA_NUMERIC_VALUE) 3379 else: 3380 # Setting the arg explicitly to empty with '--metadata=""' 3381 # makes C# client fail 3382 # (see https://github.com/commandlineparser/commandline/issues/412), 3383 # so instead we just rely on clients using the default when 3384 # metadata arg is not specified. 3385 metadata_to_send = '' 3386 3387 # TODO(ericgribkoff) Temporarily disable fail_on_failed_rpc checks 3388 # in the client. This means we will ignore intermittent RPC 3389 # failures (but this framework still checks that the final result 3390 # is as expected). 3391 # 3392 # Reason for disabling this is, the resources are shared by 3393 # multiple tests, and a change in previous test could be delayed 3394 # until the second test starts. The second test may see 3395 # intermittent failures because of that. 3396 # 3397 # A fix is to not share resources between tests (though that does 3398 # mean the tests will be significantly slower due to creating new 3399 # resources). 3400 fail_on_failed_rpc = '' 3401 3402 try: 3403 if not CLIENT_HOSTS: 3404 client_cmd_formatted = args.client_cmd.format( 3405 server_uri=server_uri, 3406 stats_port=args.stats_port, 3407 qps=args.qps, 3408 fail_on_failed_rpc=fail_on_failed_rpc, 3409 rpcs_to_send=rpcs_to_send, 3410 metadata_to_send=metadata_to_send) 3411 logger.debug('running client: %s', client_cmd_formatted) 3412 client_cmd = shlex.split(client_cmd_formatted) 3413 client_process = subprocess.Popen(client_cmd, 3414 env=client_env, 3415 stderr=subprocess.STDOUT, 3416 stdout=test_log_file) 3417 if test_case == 'backends_restart': 3418 test_backends_restart(gcp, backend_service, instance_group) 3419 elif test_case == 'change_backend_service': 3420 test_change_backend_service(gcp, backend_service, 3421 instance_group, 3422 alternate_backend_service, 3423 same_zone_instance_group) 3424 elif test_case == 'gentle_failover': 3425 test_gentle_failover(gcp, backend_service, instance_group, 3426 secondary_zone_instance_group) 3427 elif test_case == 'load_report_based_failover': 3428 test_load_report_based_failover( 3429 gcp, backend_service, instance_group, 3430 secondary_zone_instance_group) 3431 elif test_case == 'ping_pong': 3432 test_ping_pong(gcp, backend_service, instance_group) 3433 elif test_case == 'remove_instance_group': 3434 test_remove_instance_group(gcp, backend_service, 3435 instance_group, 3436 same_zone_instance_group) 3437 elif test_case == 'round_robin': 3438 test_round_robin(gcp, backend_service, instance_group) 3439 elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure': 3440 test_secondary_locality_gets_no_requests_on_partial_primary_failure( 3441 gcp, backend_service, instance_group, 3442 secondary_zone_instance_group) 3443 elif test_case == 'secondary_locality_gets_requests_on_primary_failure': 3444 test_secondary_locality_gets_requests_on_primary_failure( 3445 gcp, backend_service, instance_group, 3446 secondary_zone_instance_group) 3447 elif test_case == 'traffic_splitting': 3448 test_traffic_splitting(gcp, backend_service, instance_group, 3449 alternate_backend_service, 3450 same_zone_instance_group) 3451 elif test_case == 'path_matching': 3452 test_path_matching(gcp, backend_service, instance_group, 3453 alternate_backend_service, 3454 same_zone_instance_group) 3455 elif test_case == 'header_matching': 3456 test_header_matching(gcp, backend_service, instance_group, 3457 alternate_backend_service, 3458 same_zone_instance_group) 3459 elif test_case == 'circuit_breaking': 3460 test_circuit_breaking(gcp, backend_service, instance_group, 3461 same_zone_instance_group) 3462 elif test_case == 'timeout': 3463 test_timeout(gcp, backend_service, instance_group) 3464 elif test_case == 'fault_injection': 3465 test_fault_injection(gcp, backend_service, instance_group) 3466 elif test_case == 'api_listener': 3467 server_uri = test_api_listener(gcp, backend_service, 3468 instance_group, 3469 alternate_backend_service) 3470 elif test_case == 'forwarding_rule_port_match': 3471 server_uri = test_forwarding_rule_port_match( 3472 gcp, backend_service, instance_group) 3473 elif test_case == 'forwarding_rule_default_port': 3474 server_uri = test_forwarding_rule_default_port( 3475 gcp, backend_service, instance_group) 3476 elif test_case == 'metadata_filter': 3477 test_metadata_filter(gcp, backend_service, instance_group, 3478 alternate_backend_service, 3479 same_zone_instance_group) 3480 elif test_case == 'csds': 3481 test_csds(gcp, backend_service, instance_group, server_uri) 3482 else: 3483 logger.error('Unknown test case: %s', test_case) 3484 sys.exit(1) 3485 if client_process and client_process.poll() is not None: 3486 raise Exception( 3487 'Client process exited prematurely with exit code %d' % 3488 client_process.returncode) 3489 result.state = 'PASSED' 3490 result.returncode = 0 3491 except Exception as e: 3492 logger.exception('Test case %s failed', test_case) 3493 failed_tests.append(test_case) 3494 result.state = 'FAILED' 3495 result.message = str(e) 3496 if args.halt_after_fail: 3497 # Stop the test suite if one case failed. 3498 raise 3499 finally: 3500 if client_process: 3501 if client_process.returncode: 3502 logger.info('Client exited with code %d' % 3503 client_process.returncode) 3504 else: 3505 client_process.terminate() 3506 test_log_file.close() 3507 # Workaround for Python 3, as report_utils will invoke decode() on 3508 # result.message, which has a default value of ''. 3509 result.message = result.message.encode('UTF-8') 3510 test_results[test_case] = [result] 3511 if args.log_client_output: 3512 logger.info('Client output:') 3513 with open(test_log_filename, 'r') as client_output: 3514 logger.info(client_output.read()) 3515 if not os.path.exists(_TEST_LOG_BASE_DIR): 3516 os.makedirs(_TEST_LOG_BASE_DIR) 3517 report_utils.render_junit_xml_report(test_results, 3518 os.path.join( 3519 _TEST_LOG_BASE_DIR, 3520 _SPONGE_XML_NAME), 3521 suite_name='xds_tests', 3522 multi_target=True) 3523 if failed_tests: 3524 logger.error('Test case(s) %s failed', failed_tests) 3525 sys.exit(1) 3526finally: 3527 keep_resources = args.keep_gcp_resources 3528 if args.halt_after_fail and failed_tests: 3529 logger.info( 3530 'Halt after fail triggered, exiting without cleaning up resources') 3531 keep_resources = True 3532 if not keep_resources: 3533 logger.info('Cleaning up GCP resources. This may take some time.') 3534 clean_up(gcp) 3535