1#!/usr/bin/env python
2# Copyright 2020 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Run xDS integration tests on GCP using Traffic Director."""
16
17import argparse
18import datetime
19import json
20import logging
21import os
22import random
23import re
24import shlex
25import socket
26import subprocess
27import sys
28import tempfile
29import time
30import uuid
31
32from google.protobuf import json_format
33import googleapiclient.discovery
34import grpc
35from oauth2client.client import GoogleCredentials
36
37import python_utils.jobset as jobset
38import python_utils.report_utils as report_utils
39from src.proto.grpc.health.v1 import health_pb2
40from src.proto.grpc.health.v1 import health_pb2_grpc
41from src.proto.grpc.testing import empty_pb2
42from src.proto.grpc.testing import messages_pb2
43from src.proto.grpc.testing import test_pb2_grpc
44
45# Envoy protos provided by PyPI package xds-protos
46# Needs to import the generated Python file to load descriptors
47try:
48    from envoy.extensions.filters.common.fault.v3 import fault_pb2
49    from envoy.extensions.filters.http.fault.v3 import fault_pb2
50    from envoy.extensions.filters.http.router.v3 import router_pb2
51    from envoy.extensions.filters.network.http_connection_manager.v3 import \
52        http_connection_manager_pb2
53    from envoy.service.status.v3 import csds_pb2
54    from envoy.service.status.v3 import csds_pb2_grpc
55except ImportError:
56    # These protos are required by CSDS test. We should not fail the entire
57    # script for one test case.
58    pass
59
60logger = logging.getLogger()
61console_handler = logging.StreamHandler()
62formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
63console_handler.setFormatter(formatter)
64logger.handlers = []
65logger.addHandler(console_handler)
66logger.setLevel(logging.WARNING)
67
68# Suppress excessive logs for gRPC Python
69original_grpc_trace = os.environ.pop('GRPC_TRACE', None)
70original_grpc_verbosity = os.environ.pop('GRPC_VERBOSITY', None)
71# Suppress not-essential logs for GCP clients
72logging.getLogger('google_auth_httplib2').setLevel(logging.WARNING)
73logging.getLogger('googleapiclient.discovery').setLevel(logging.WARNING)
74
75_TEST_CASES = [
76    'backends_restart',
77    'change_backend_service',
78    'gentle_failover',
79    'load_report_based_failover',
80    'ping_pong',
81    'remove_instance_group',
82    'round_robin',
83    'secondary_locality_gets_no_requests_on_partial_primary_failure',
84    'secondary_locality_gets_requests_on_primary_failure',
85    'traffic_splitting',
86    'path_matching',
87    'header_matching',
88    'api_listener',
89    'forwarding_rule_port_match',
90    'forwarding_rule_default_port',
91    'metadata_filter',
92]
93
94# Valid test cases, but not in all. So the tests can only run manually, and
95# aren't enabled automatically for all languages.
96#
97# TODO: Move them into _TEST_CASES when support is ready in all languages.
98_ADDITIONAL_TEST_CASES = [
99    'circuit_breaking',
100    'timeout',
101    'fault_injection',
102    'csds',
103]
104
105# Test cases that require the V3 API.  Skipped in older runs.
106_V3_TEST_CASES = frozenset(['timeout', 'fault_injection', 'csds'])
107
108# Test cases that require the alpha API.  Skipped for stable API runs.
109_ALPHA_TEST_CASES = frozenset(['timeout'])
110
111
112def parse_test_cases(arg):
113    if arg == '':
114        return []
115    arg_split = arg.split(',')
116    test_cases = set()
117    all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES
118    for arg in arg_split:
119        if arg == "all":
120            test_cases = test_cases.union(_TEST_CASES)
121        else:
122            test_cases = test_cases.union([arg])
123    if not all([test_case in all_test_cases for test_case in test_cases]):
124        raise Exception('Failed to parse test cases %s' % arg)
125    # Perserve order.
126    return [x for x in all_test_cases if x in test_cases]
127
128
129def parse_port_range(port_arg):
130    try:
131        port = int(port_arg)
132        return range(port, port + 1)
133    except:
134        port_min, port_max = port_arg.split(':')
135        return range(int(port_min), int(port_max) + 1)
136
137
138argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP')
139# TODO(zdapeng): remove default value of project_id and project_num
140argp.add_argument('--project_id', default='grpc-testing', help='GCP project id')
141argp.add_argument('--project_num',
142                  default='830293263384',
143                  help='GCP project number')
144argp.add_argument(
145    '--gcp_suffix',
146    default='',
147    help='Optional suffix for all generated GCP resource names. Useful to '
148    'ensure distinct names across test runs.')
149argp.add_argument(
150    '--test_case',
151    default='ping_pong',
152    type=parse_test_cases,
153    help='Comma-separated list of test cases to run. Available tests: %s, '
154    '(or \'all\' to run every test). '
155    'Alternative tests not included in \'all\': %s' %
156    (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES)))
157argp.add_argument(
158    '--bootstrap_file',
159    default='',
160    help='File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in '
161    'bootstrap generation')
162argp.add_argument(
163    '--xds_v3_support',
164    default=False,
165    action='store_true',
166    help='Support xDS v3 via GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. '
167    'If a pre-created bootstrap file is provided via the --bootstrap_file '
168    'parameter, it should include xds_v3 in its server_features field.')
169argp.add_argument(
170    '--client_cmd',
171    default=None,
172    help='Command to launch xDS test client. {server_uri}, {stats_port} and '
173    '{qps} references will be replaced using str.format(). GRPC_XDS_BOOTSTRAP '
174    'will be set for the command')
175argp.add_argument(
176    '--client_hosts',
177    default=None,
178    help='Comma-separated list of hosts running client processes. If set, '
179    '--client_cmd is ignored and client processes are assumed to be running on '
180    'the specified hosts.')
181argp.add_argument('--zone', default='us-central1-a')
182argp.add_argument('--secondary_zone',
183                  default='us-west1-b',
184                  help='Zone to use for secondary TD locality tests')
185argp.add_argument('--qps', default=100, type=int, help='Client QPS')
186argp.add_argument(
187    '--wait_for_backend_sec',
188    default=1200,
189    type=int,
190    help='Time limit for waiting for created backend services to report '
191    'healthy when launching or updated GCP resources')
192argp.add_argument(
193    '--use_existing_gcp_resources',
194    default=False,
195    action='store_true',
196    help=
197    'If set, find and use already created GCP resources instead of creating new'
198    ' ones.')
199argp.add_argument(
200    '--keep_gcp_resources',
201    default=False,
202    action='store_true',
203    help=
204    'Leave GCP VMs and configuration running after test. Default behavior is '
205    'to delete when tests complete.')
206argp.add_argument('--halt_after_fail',
207                  action='store_true',
208                  help='Halt and save the resources when test failed.')
209argp.add_argument(
210    '--compute_discovery_document',
211    default=None,
212    type=str,
213    help=
214    'If provided, uses this file instead of retrieving via the GCP discovery '
215    'API')
216argp.add_argument(
217    '--alpha_compute_discovery_document',
218    default=None,
219    type=str,
220    help='If provided, uses this file instead of retrieving via the alpha GCP '
221    'discovery API')
222argp.add_argument('--network',
223                  default='global/networks/default',
224                  help='GCP network to use')
225_DEFAULT_PORT_RANGE = '8080:8280'
226argp.add_argument('--service_port_range',
227                  default=_DEFAULT_PORT_RANGE,
228                  type=parse_port_range,
229                  help='Listening port for created gRPC backends. Specified as '
230                  'either a single int or as a range in the format min:max, in '
231                  'which case an available port p will be chosen s.t. min <= p '
232                  '<= max')
233argp.add_argument(
234    '--stats_port',
235    default=8079,
236    type=int,
237    help='Local port for the client process to expose the LB stats service')
238argp.add_argument('--xds_server',
239                  default='trafficdirector.googleapis.com:443',
240                  help='xDS server')
241argp.add_argument('--source_image',
242                  default='projects/debian-cloud/global/images/family/debian-9',
243                  help='Source image for VMs created during the test')
244argp.add_argument('--path_to_server_binary',
245                  default=None,
246                  type=str,
247                  help='If set, the server binary must already be pre-built on '
248                  'the specified source image')
249argp.add_argument('--machine_type',
250                  default='e2-standard-2',
251                  help='Machine type for VMs created during the test')
252argp.add_argument(
253    '--instance_group_size',
254    default=2,
255    type=int,
256    help='Number of VMs to create per instance group. Certain test cases (e.g., '
257    'round_robin) may not give meaningful results if this is set to a value '
258    'less than 2.')
259argp.add_argument('--verbose',
260                  help='verbose log output',
261                  default=False,
262                  action='store_true')
263# TODO(ericgribkoff) Remove this param once the sponge-formatted log files are
264# visible in all test environments.
265argp.add_argument('--log_client_output',
266                  help='Log captured client output',
267                  default=False,
268                  action='store_true')
269# TODO(ericgribkoff) Remove this flag once all test environments are verified to
270# have access to the alpha compute APIs.
271argp.add_argument('--only_stable_gcp_apis',
272                  help='Do not use alpha compute APIs. Some tests may be '
273                  'incompatible with this option (gRPC health checks are '
274                  'currently alpha and required for simulating server failure',
275                  default=False,
276                  action='store_true')
277args = argp.parse_args()
278
279if args.verbose:
280    logger.setLevel(logging.DEBUG)
281
282CLIENT_HOSTS = []
283if args.client_hosts:
284    CLIENT_HOSTS = args.client_hosts.split(',')
285
286# Each of the config propagation in the control plane should finish within 600s.
287# Otherwise, it indicates a bug in the control plane. The config propagation
288# includes all kinds of traffic config update, like updating urlMap, creating
289# the resources for the first time, updating BackendService, and changing the
290# status of endpoints in BackendService.
291_WAIT_FOR_URL_MAP_PATCH_SEC = 600
292# In general, fetching load balancing stats only takes ~10s. However, slow
293# config update could lead to empty EDS or similar symptoms causing the
294# connection to hang for a long period of time. So, we want to extend the stats
295# wait time to be the same as urlMap patch time.
296_WAIT_FOR_STATS_SEC = _WAIT_FOR_URL_MAP_PATCH_SEC
297
298_DEFAULT_SERVICE_PORT = 80
299_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec
300_WAIT_FOR_OPERATION_SEC = 1200
301_INSTANCE_GROUP_SIZE = args.instance_group_size
302_NUM_TEST_RPCS = 10 * args.qps
303_CONNECTION_TIMEOUT_SEC = 60
304_GCP_API_RETRIES = 5
305_BOOTSTRAP_TEMPLATE = """
306{{
307  "node": {{
308    "id": "{node_id}",
309    "metadata": {{
310      "TRAFFICDIRECTOR_NETWORK_NAME": "%s",
311      "com.googleapis.trafficdirector.config_time_trace": "TRUE"
312    }},
313    "locality": {{
314      "zone": "%s"
315    }}
316  }},
317  "xds_servers": [{{
318    "server_uri": "%s",
319    "channel_creds": [
320      {{
321        "type": "google_default",
322        "config": {{}}
323      }}
324    ],
325    "server_features": {server_features}
326  }}]
327}}""" % (args.network.split('/')[-1], args.zone, args.xds_server)
328
329# TODO(ericgribkoff) Add change_backend_service to this list once TD no longer
330# sends an update with no localities when adding the MIG to the backend service
331# can race with the URL map patch.
332_TESTS_TO_FAIL_ON_RPC_FAILURE = ['ping_pong', 'round_robin']
333# Tests that run UnaryCall and EmptyCall.
334_TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching']
335# Tests that make UnaryCall with test metadata.
336_TESTS_TO_SEND_METADATA = ['header_matching']
337_TEST_METADATA_KEY = 'xds_md'
338_TEST_METADATA_VALUE_UNARY = 'unary_yranu'
339_TEST_METADATA_VALUE_EMPTY = 'empty_ytpme'
340# Extra RPC metadata whose value is a number, sent with UnaryCall only.
341_TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric'
342_TEST_METADATA_NUMERIC_VALUE = '159'
343_PATH_MATCHER_NAME = 'path-matcher'
344_BASE_TEMPLATE_NAME = 'test-template'
345_BASE_INSTANCE_GROUP_NAME = 'test-ig'
346_BASE_HEALTH_CHECK_NAME = 'test-hc'
347_BASE_FIREWALL_RULE_NAME = 'test-fw-rule'
348_BASE_BACKEND_SERVICE_NAME = 'test-backend-service'
349_BASE_URL_MAP_NAME = 'test-map'
350_BASE_SERVICE_HOST = 'grpc-test'
351_BASE_TARGET_PROXY_NAME = 'test-target-proxy'
352_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule'
353_TEST_LOG_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)),
354                                  '../../reports')
355_SPONGE_LOG_NAME = 'sponge_log.log'
356_SPONGE_XML_NAME = 'sponge_log.xml'
357
358
359def get_client_stats(num_rpcs, timeout_sec):
360    if CLIENT_HOSTS:
361        hosts = CLIENT_HOSTS
362    else:
363        hosts = ['localhost']
364    for host in hosts:
365        with grpc.insecure_channel('%s:%d' %
366                                   (host, args.stats_port)) as channel:
367            stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
368            request = messages_pb2.LoadBalancerStatsRequest()
369            request.num_rpcs = num_rpcs
370            request.timeout_sec = timeout_sec
371            rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC
372            logger.debug('Invoking GetClientStats RPC to %s:%d:', host,
373                         args.stats_port)
374            response = stub.GetClientStats(request,
375                                           wait_for_ready=True,
376                                           timeout=rpc_timeout)
377            logger.debug('Invoked GetClientStats RPC to %s: %s', host,
378                         json_format.MessageToJson(response))
379            return response
380
381
382def get_client_accumulated_stats():
383    if CLIENT_HOSTS:
384        hosts = CLIENT_HOSTS
385    else:
386        hosts = ['localhost']
387    for host in hosts:
388        with grpc.insecure_channel('%s:%d' %
389                                   (host, args.stats_port)) as channel:
390            stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
391            request = messages_pb2.LoadBalancerAccumulatedStatsRequest()
392            logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:',
393                         host, args.stats_port)
394            response = stub.GetClientAccumulatedStats(
395                request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC)
396            logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s',
397                         host, response)
398            return response
399
400
401def get_client_xds_config_dump():
402    if CLIENT_HOSTS:
403        hosts = CLIENT_HOSTS
404    else:
405        hosts = ['localhost']
406    for host in hosts:
407        server_address = '%s:%d' % (host, args.stats_port)
408        with grpc.insecure_channel(server_address) as channel:
409            stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(channel)
410            logger.debug('Fetching xDS config dump from %s', server_address)
411            response = stub.FetchClientStatus(csds_pb2.ClientStatusRequest(),
412                                              wait_for_ready=True,
413                                              timeout=_CONNECTION_TIMEOUT_SEC)
414            logger.debug('Fetched xDS config dump from %s', server_address)
415            if len(response.config) != 1:
416                logger.error('Unexpected number of ClientConfigs %d: %s',
417                             len(response.config), response)
418                return None
419            else:
420                # Converting the ClientStatusResponse into JSON, because many
421                # fields are packed in google.protobuf.Any. It will require many
422                # duplicated code to unpack proto message and inspect values.
423                return json_format.MessageToDict(
424                    response.config[0], preserving_proto_field_name=True)
425
426
427def configure_client(rpc_types, metadata=[], timeout_sec=None):
428    if CLIENT_HOSTS:
429        hosts = CLIENT_HOSTS
430    else:
431        hosts = ['localhost']
432    for host in hosts:
433        with grpc.insecure_channel('%s:%d' %
434                                   (host, args.stats_port)) as channel:
435            stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel)
436            request = messages_pb2.ClientConfigureRequest()
437            request.types.extend(rpc_types)
438            for rpc_type, md_key, md_value in metadata:
439                md = request.metadata.add()
440                md.type = rpc_type
441                md.key = md_key
442                md.value = md_value
443            if timeout_sec:
444                request.timeout_sec = timeout_sec
445            logger.debug(
446                'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s',
447                host, args.stats_port, request)
448            stub.Configure(request,
449                           wait_for_ready=True,
450                           timeout=_CONNECTION_TIMEOUT_SEC)
451            logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s',
452                         host)
453
454
455class RpcDistributionError(Exception):
456    pass
457
458
459def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs,
460                                   allow_failures):
461    start_time = time.time()
462    error_msg = None
463    logger.debug('Waiting for %d sec until backends %s receive load' %
464                 (timeout_sec, backends))
465    while time.time() - start_time <= timeout_sec:
466        error_msg = None
467        stats = get_client_stats(num_rpcs, timeout_sec)
468        rpcs_by_peer = stats.rpcs_by_peer
469        for backend in backends:
470            if backend not in rpcs_by_peer:
471                error_msg = 'Backend %s did not receive load' % backend
472                break
473        if not error_msg and len(rpcs_by_peer) > len(backends):
474            error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer
475        if not allow_failures and stats.num_failures > 0:
476            error_msg = '%d RPCs failed' % stats.num_failures
477        if not error_msg:
478            return
479    raise RpcDistributionError(error_msg)
480
481
482def wait_until_all_rpcs_go_to_given_backends_or_fail(backends,
483                                                     timeout_sec,
484                                                     num_rpcs=_NUM_TEST_RPCS):
485    _verify_rpcs_to_given_backends(backends,
486                                   timeout_sec,
487                                   num_rpcs,
488                                   allow_failures=True)
489
490
491def wait_until_all_rpcs_go_to_given_backends(backends,
492                                             timeout_sec,
493                                             num_rpcs=_NUM_TEST_RPCS):
494    _verify_rpcs_to_given_backends(backends,
495                                   timeout_sec,
496                                   num_rpcs,
497                                   allow_failures=False)
498
499
500def wait_until_no_rpcs_go_to_given_backends(backends, timeout_sec):
501    start_time = time.time()
502    while time.time() - start_time <= timeout_sec:
503        stats = get_client_stats(_NUM_TEST_RPCS, timeout_sec)
504        error_msg = None
505        rpcs_by_peer = stats.rpcs_by_peer
506        for backend in backends:
507            if backend in rpcs_by_peer:
508                error_msg = 'Unexpected backend %s receives load' % backend
509                break
510        if not error_msg:
511            return
512    raise Exception('Unexpected RPCs going to given backends')
513
514
515def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold):
516    '''Block until the test client reaches the state with the given number
517    of RPCs being outstanding stably.
518
519    Args:
520      rpc_type: A string indicating the RPC method to check for. Either
521        'UnaryCall' or 'EmptyCall'.
522      timeout_sec: Maximum number of seconds to wait until the desired state
523        is reached.
524      num_rpcs: Expected number of RPCs to be in-flight.
525      threshold: Number within [0,100], the tolerable percentage by which
526        the actual number of RPCs in-flight can differ from the expected number.
527    '''
528    if threshold < 0 or threshold > 100:
529        raise ValueError('Value error: Threshold should be between 0 to 100')
530    threshold_fraction = threshold / 100.0
531    start_time = time.time()
532    error_msg = None
533    logger.debug(
534        'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' %
535        (timeout_sec, num_rpcs, rpc_type, threshold))
536    while time.time() - start_time <= timeout_sec:
537        error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
538                                          threshold_fraction)
539        if error_msg:
540            logger.debug('Progress: %s', error_msg)
541            time.sleep(2)
542        else:
543            break
544    # Ensure the number of outstanding RPCs is stable.
545    if not error_msg:
546        time.sleep(5)
547        error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
548                                          threshold_fraction)
549    if error_msg:
550        raise Exception("Wrong number of %s RPCs in-flight: %s" %
551                        (rpc_type, error_msg))
552
553
554def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction):
555    error_msg = None
556    stats = get_client_accumulated_stats()
557    rpcs_started = stats.num_rpcs_started_by_method[rpc_type]
558    rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type]
559    rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type]
560    rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed
561    if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)):
562        error_msg = ('actual(%d) < expected(%d - %d%%)' %
563                     (rpcs_in_flight, num_rpcs, threshold))
564    elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)):
565        error_msg = ('actual(%d) > expected(%d + %d%%)' %
566                     (rpcs_in_flight, num_rpcs, threshold))
567    return error_msg
568
569
570def compare_distributions(actual_distribution, expected_distribution,
571                          threshold):
572    """Compare if two distributions are similar.
573
574    Args:
575      actual_distribution: A list of floats, contains the actual distribution.
576      expected_distribution: A list of floats, contains the expected distribution.
577      threshold: Number within [0,100], the threshold percentage by which the
578        actual distribution can differ from the expected distribution.
579
580    Returns:
581      The similarity between the distributions as a boolean. Returns true if the
582      actual distribution lies within the threshold of the expected
583      distribution, false otherwise.
584
585    Raises:
586      ValueError: if threshold is not with in [0,100].
587      Exception: containing detailed error messages.
588    """
589    if len(expected_distribution) != len(actual_distribution):
590        raise Exception(
591            'Error: expected and actual distributions have different size (%d vs %d)'
592            % (len(expected_distribution), len(actual_distribution)))
593    if threshold < 0 or threshold > 100:
594        raise ValueError('Value error: Threshold should be between 0 to 100')
595    threshold_fraction = threshold / 100.0
596    for expected, actual in zip(expected_distribution, actual_distribution):
597        if actual < (expected * (1 - threshold_fraction)):
598            raise Exception("actual(%f) < expected(%f-%d%%)" %
599                            (actual, expected, threshold))
600        if actual > (expected * (1 + threshold_fraction)):
601            raise Exception("actual(%f) > expected(%f+%d%%)" %
602                            (actual, expected, threshold))
603    return True
604
605
606def compare_expected_instances(stats, expected_instances):
607    """Compare if stats have expected instances for each type of RPC.
608
609    Args:
610      stats: LoadBalancerStatsResponse reported by interop client.
611      expected_instances: a dict with key as the RPC type (string), value as
612        the expected backend instances (list of strings).
613
614    Returns:
615      Returns true if the instances are expected. False if not.
616    """
617    for rpc_type, expected_peers in expected_instances.items():
618        rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type]
619        rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None
620        logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer)
621        peers = list(rpcs_by_peer.keys())
622        if set(peers) != set(expected_peers):
623            logger.info('unexpected peers for %s, got %s, want %s', rpc_type,
624                        peers, expected_peers)
625            return False
626    return True
627
628
629def test_backends_restart(gcp, backend_service, instance_group):
630    logger.info('Running test_backends_restart')
631    instance_names = get_instance_names(gcp, instance_group)
632    num_instances = len(instance_names)
633    start_time = time.time()
634    wait_until_all_rpcs_go_to_given_backends(instance_names,
635                                             _WAIT_FOR_STATS_SEC)
636    try:
637        resize_instance_group(gcp, instance_group, 0)
638        wait_until_all_rpcs_go_to_given_backends_or_fail([],
639                                                         _WAIT_FOR_BACKEND_SEC)
640    finally:
641        resize_instance_group(gcp, instance_group, num_instances)
642    wait_for_healthy_backends(gcp, backend_service, instance_group)
643    new_instance_names = get_instance_names(gcp, instance_group)
644    wait_until_all_rpcs_go_to_given_backends(new_instance_names,
645                                             _WAIT_FOR_BACKEND_SEC)
646
647
648def test_change_backend_service(gcp, original_backend_service, instance_group,
649                                alternate_backend_service,
650                                same_zone_instance_group):
651    logger.info('Running test_change_backend_service')
652    original_backend_instances = get_instance_names(gcp, instance_group)
653    alternate_backend_instances = get_instance_names(gcp,
654                                                     same_zone_instance_group)
655    patch_backend_service(gcp, alternate_backend_service,
656                          [same_zone_instance_group])
657    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
658    wait_for_healthy_backends(gcp, alternate_backend_service,
659                              same_zone_instance_group)
660    wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
661                                             _WAIT_FOR_STATS_SEC)
662    passed = True
663    try:
664        patch_url_map_backend_service(gcp, alternate_backend_service)
665        wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances,
666                                                 _WAIT_FOR_URL_MAP_PATCH_SEC)
667    except Exception:
668        passed = False
669        raise
670    finally:
671        if passed or not args.halt_after_fail:
672            patch_url_map_backend_service(gcp, original_backend_service)
673            patch_backend_service(gcp, alternate_backend_service, [])
674
675
676def test_gentle_failover(gcp,
677                         backend_service,
678                         primary_instance_group,
679                         secondary_instance_group,
680                         swapped_primary_and_secondary=False):
681    logger.info('Running test_gentle_failover')
682    num_primary_instances = len(get_instance_names(gcp, primary_instance_group))
683    min_instances_for_gentle_failover = 3  # Need >50% failure to start failover
684    passed = True
685    try:
686        if num_primary_instances < min_instances_for_gentle_failover:
687            resize_instance_group(gcp, primary_instance_group,
688                                  min_instances_for_gentle_failover)
689        patch_backend_service(
690            gcp, backend_service,
691            [primary_instance_group, secondary_instance_group])
692        primary_instance_names = get_instance_names(gcp, primary_instance_group)
693        secondary_instance_names = get_instance_names(gcp,
694                                                      secondary_instance_group)
695        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
696        wait_for_healthy_backends(gcp, backend_service,
697                                  secondary_instance_group)
698        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
699                                                 _WAIT_FOR_STATS_SEC)
700        instances_to_stop = primary_instance_names[:-1]
701        remaining_instances = primary_instance_names[-1:]
702        try:
703            set_serving_status(instances_to_stop,
704                               gcp.service_port,
705                               serving=False)
706            wait_until_all_rpcs_go_to_given_backends(
707                remaining_instances + secondary_instance_names,
708                _WAIT_FOR_BACKEND_SEC)
709        finally:
710            set_serving_status(primary_instance_names,
711                               gcp.service_port,
712                               serving=True)
713    except RpcDistributionError as e:
714        if not swapped_primary_and_secondary and is_primary_instance_group(
715                gcp, secondary_instance_group):
716            # Swap expectation of primary and secondary instance groups.
717            test_gentle_failover(gcp,
718                                 backend_service,
719                                 secondary_instance_group,
720                                 primary_instance_group,
721                                 swapped_primary_and_secondary=True)
722        else:
723            passed = False
724            raise e
725    except Exception:
726        passed = False
727        raise
728    finally:
729        if passed or not args.halt_after_fail:
730            patch_backend_service(gcp, backend_service,
731                                  [primary_instance_group])
732            resize_instance_group(gcp, primary_instance_group,
733                                  num_primary_instances)
734            instance_names = get_instance_names(gcp, primary_instance_group)
735            wait_until_all_rpcs_go_to_given_backends(instance_names,
736                                                     _WAIT_FOR_BACKEND_SEC)
737
738
739def test_load_report_based_failover(gcp, backend_service,
740                                    primary_instance_group,
741                                    secondary_instance_group):
742    logger.info('Running test_load_report_based_failover')
743    passed = True
744    try:
745        patch_backend_service(
746            gcp, backend_service,
747            [primary_instance_group, secondary_instance_group])
748        primary_instance_names = get_instance_names(gcp, primary_instance_group)
749        secondary_instance_names = get_instance_names(gcp,
750                                                      secondary_instance_group)
751        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
752        wait_for_healthy_backends(gcp, backend_service,
753                                  secondary_instance_group)
754        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
755                                                 _WAIT_FOR_STATS_SEC)
756        # Set primary locality's balance mode to RATE, and RPS to 20% of the
757        # client's QPS. The secondary locality will be used.
758        max_rate = int(args.qps * 1 / 5)
759        logger.info('Patching backend service to RATE with %d max_rate',
760                    max_rate)
761        patch_backend_service(
762            gcp,
763            backend_service, [primary_instance_group, secondary_instance_group],
764            balancing_mode='RATE',
765            max_rate=max_rate)
766        wait_until_all_rpcs_go_to_given_backends(
767            primary_instance_names + secondary_instance_names,
768            _WAIT_FOR_BACKEND_SEC)
769
770        # Set primary locality's balance mode to RATE, and RPS to 120% of the
771        # client's QPS. Only the primary locality will be used.
772        max_rate = int(args.qps * 6 / 5)
773        logger.info('Patching backend service to RATE with %d max_rate',
774                    max_rate)
775        patch_backend_service(
776            gcp,
777            backend_service, [primary_instance_group, secondary_instance_group],
778            balancing_mode='RATE',
779            max_rate=max_rate)
780        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
781                                                 _WAIT_FOR_BACKEND_SEC)
782        logger.info("success")
783    except Exception:
784        passed = False
785        raise
786    finally:
787        if passed or not args.halt_after_fail:
788            patch_backend_service(gcp, backend_service,
789                                  [primary_instance_group])
790            instance_names = get_instance_names(gcp, primary_instance_group)
791            wait_until_all_rpcs_go_to_given_backends(instance_names,
792                                                     _WAIT_FOR_BACKEND_SEC)
793
794
795def test_ping_pong(gcp, backend_service, instance_group):
796    logger.info('Running test_ping_pong')
797    wait_for_healthy_backends(gcp, backend_service, instance_group)
798    instance_names = get_instance_names(gcp, instance_group)
799    wait_until_all_rpcs_go_to_given_backends(instance_names,
800                                             _WAIT_FOR_STATS_SEC)
801
802
803def test_remove_instance_group(gcp, backend_service, instance_group,
804                               same_zone_instance_group):
805    logger.info('Running test_remove_instance_group')
806    passed = True
807    try:
808        patch_backend_service(gcp,
809                              backend_service,
810                              [instance_group, same_zone_instance_group],
811                              balancing_mode='RATE')
812        wait_for_healthy_backends(gcp, backend_service, instance_group)
813        wait_for_healthy_backends(gcp, backend_service,
814                                  same_zone_instance_group)
815        instance_names = get_instance_names(gcp, instance_group)
816        same_zone_instance_names = get_instance_names(gcp,
817                                                      same_zone_instance_group)
818        try:
819            wait_until_all_rpcs_go_to_given_backends(
820                instance_names + same_zone_instance_names,
821                _WAIT_FOR_OPERATION_SEC)
822            remaining_instance_group = same_zone_instance_group
823            remaining_instance_names = same_zone_instance_names
824        except RpcDistributionError as e:
825            # If connected to TD in a different zone, we may route traffic to
826            # only one instance group. Determine which group that is to continue
827            # with the remainder of the test case.
828            try:
829                wait_until_all_rpcs_go_to_given_backends(
830                    instance_names, _WAIT_FOR_STATS_SEC)
831                remaining_instance_group = same_zone_instance_group
832                remaining_instance_names = same_zone_instance_names
833            except RpcDistributionError as e:
834                wait_until_all_rpcs_go_to_given_backends(
835                    same_zone_instance_names, _WAIT_FOR_STATS_SEC)
836                remaining_instance_group = instance_group
837                remaining_instance_names = instance_names
838        patch_backend_service(gcp,
839                              backend_service, [remaining_instance_group],
840                              balancing_mode='RATE')
841        wait_until_all_rpcs_go_to_given_backends(remaining_instance_names,
842                                                 _WAIT_FOR_BACKEND_SEC)
843    except Exception:
844        passed = False
845        raise
846    finally:
847        if passed or not args.halt_after_fail:
848            patch_backend_service(gcp, backend_service, [instance_group])
849            wait_until_all_rpcs_go_to_given_backends(instance_names,
850                                                     _WAIT_FOR_BACKEND_SEC)
851
852
853def test_round_robin(gcp, backend_service, instance_group):
854    logger.info('Running test_round_robin')
855    wait_for_healthy_backends(gcp, backend_service, instance_group)
856    instance_names = get_instance_names(gcp, instance_group)
857    threshold = 1
858    wait_until_all_rpcs_go_to_given_backends(instance_names,
859                                             _WAIT_FOR_STATS_SEC)
860    # TODO(ericgribkoff) Delayed config propagation from earlier tests
861    # may result in briefly receiving an empty EDS update, resulting in failed
862    # RPCs. Retry distribution validation if this occurs; long-term fix is
863    # creating new backend resources for each individual test case.
864    # Each attempt takes 10 seconds. Config propagation can take several
865    # minutes.
866    max_attempts = 40
867    for i in range(max_attempts):
868        stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
869        requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]
870        total_requests_received = sum(requests_received)
871        if total_requests_received != _NUM_TEST_RPCS:
872            logger.info('Unexpected RPC failures, retrying: %s', stats)
873            continue
874        expected_requests = total_requests_received / len(instance_names)
875        for instance in instance_names:
876            if abs(stats.rpcs_by_peer[instance] -
877                   expected_requests) > threshold:
878                raise Exception(
879                    'RPC peer distribution differs from expected by more than %d '
880                    'for instance %s (%s)' % (threshold, instance, stats))
881        return
882    raise Exception('RPC failures persisted through %d retries' % max_attempts)
883
884
885def test_secondary_locality_gets_no_requests_on_partial_primary_failure(
886        gcp,
887        backend_service,
888        primary_instance_group,
889        secondary_instance_group,
890        swapped_primary_and_secondary=False):
891    logger.info(
892        'Running secondary_locality_gets_no_requests_on_partial_primary_failure'
893    )
894    passed = True
895    try:
896        patch_backend_service(
897            gcp, backend_service,
898            [primary_instance_group, secondary_instance_group])
899        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
900        wait_for_healthy_backends(gcp, backend_service,
901                                  secondary_instance_group)
902        primary_instance_names = get_instance_names(gcp, primary_instance_group)
903        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
904                                                 _WAIT_FOR_STATS_SEC)
905        instances_to_stop = primary_instance_names[:1]
906        remaining_instances = primary_instance_names[1:]
907        try:
908            set_serving_status(instances_to_stop,
909                               gcp.service_port,
910                               serving=False)
911            wait_until_all_rpcs_go_to_given_backends(remaining_instances,
912                                                     _WAIT_FOR_BACKEND_SEC)
913        finally:
914            set_serving_status(primary_instance_names,
915                               gcp.service_port,
916                               serving=True)
917    except RpcDistributionError as e:
918        if not swapped_primary_and_secondary and is_primary_instance_group(
919                gcp, secondary_instance_group):
920            # Swap expectation of primary and secondary instance groups.
921            test_secondary_locality_gets_no_requests_on_partial_primary_failure(
922                gcp,
923                backend_service,
924                secondary_instance_group,
925                primary_instance_group,
926                swapped_primary_and_secondary=True)
927        else:
928            passed = False
929            raise e
930    finally:
931        if passed or not args.halt_after_fail:
932            patch_backend_service(gcp, backend_service,
933                                  [primary_instance_group])
934
935
936def test_secondary_locality_gets_requests_on_primary_failure(
937        gcp,
938        backend_service,
939        primary_instance_group,
940        secondary_instance_group,
941        swapped_primary_and_secondary=False):
942    logger.info('Running secondary_locality_gets_requests_on_primary_failure')
943    passed = True
944    try:
945        patch_backend_service(
946            gcp, backend_service,
947            [primary_instance_group, secondary_instance_group])
948        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
949        wait_for_healthy_backends(gcp, backend_service,
950                                  secondary_instance_group)
951        primary_instance_names = get_instance_names(gcp, primary_instance_group)
952        secondary_instance_names = get_instance_names(gcp,
953                                                      secondary_instance_group)
954        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
955                                                 _WAIT_FOR_STATS_SEC)
956        try:
957            set_serving_status(primary_instance_names,
958                               gcp.service_port,
959                               serving=False)
960            wait_until_all_rpcs_go_to_given_backends(secondary_instance_names,
961                                                     _WAIT_FOR_BACKEND_SEC)
962        finally:
963            set_serving_status(primary_instance_names,
964                               gcp.service_port,
965                               serving=True)
966    except RpcDistributionError as e:
967        if not swapped_primary_and_secondary and is_primary_instance_group(
968                gcp, secondary_instance_group):
969            # Swap expectation of primary and secondary instance groups.
970            test_secondary_locality_gets_requests_on_primary_failure(
971                gcp,
972                backend_service,
973                secondary_instance_group,
974                primary_instance_group,
975                swapped_primary_and_secondary=True)
976        else:
977            passed = False
978            raise e
979    finally:
980        if passed or not args.halt_after_fail:
981            patch_backend_service(gcp, backend_service,
982                                  [primary_instance_group])
983
984
985def prepare_services_for_urlmap_tests(gcp, original_backend_service,
986                                      instance_group, alternate_backend_service,
987                                      same_zone_instance_group):
988    '''
989    This function prepares the services to be ready for tests that modifies
990    urlmaps.
991
992    Returns:
993      Returns original and alternate backend names as lists of strings.
994    '''
995    logger.info('waiting for original backends to become healthy')
996    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
997
998    patch_backend_service(gcp, alternate_backend_service,
999                          [same_zone_instance_group])
1000    logger.info('waiting for alternate to become healthy')
1001    wait_for_healthy_backends(gcp, alternate_backend_service,
1002                              same_zone_instance_group)
1003
1004    original_backend_instances = get_instance_names(gcp, instance_group)
1005    logger.info('original backends instances: %s', original_backend_instances)
1006
1007    alternate_backend_instances = get_instance_names(gcp,
1008                                                     same_zone_instance_group)
1009    logger.info('alternate backends instances: %s', alternate_backend_instances)
1010
1011    # Start with all traffic going to original_backend_service.
1012    logger.info('waiting for traffic to all go to original backends')
1013    wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
1014                                             _WAIT_FOR_STATS_SEC)
1015    return original_backend_instances, alternate_backend_instances
1016
1017
1018def test_metadata_filter(gcp, original_backend_service, instance_group,
1019                         alternate_backend_service, same_zone_instance_group):
1020    logger.info("Running test_metadata_filter")
1021    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
1022    original_backend_instances = get_instance_names(gcp, instance_group)
1023    alternate_backend_instances = get_instance_names(gcp,
1024                                                     same_zone_instance_group)
1025    patch_backend_service(gcp, alternate_backend_service,
1026                          [same_zone_instance_group])
1027    wait_for_healthy_backends(gcp, alternate_backend_service,
1028                              same_zone_instance_group)
1029    passed = True
1030    try:
1031        with open(bootstrap_path) as f:
1032            md = json.load(f)['node']['metadata']
1033            match_labels = []
1034            for k, v in md.items():
1035                match_labels.append({'name': k, 'value': v})
1036
1037        not_match_labels = [{'name': 'fake', 'value': 'fail'}]
1038        test_route_rules = [
1039            # test MATCH_ALL
1040            [
1041                {
1042                    'priority': 0,
1043                    'matchRules': [{
1044                        'prefixMatch':
1045                            '/',
1046                        'metadataFilters': [{
1047                            'filterMatchCriteria': 'MATCH_ALL',
1048                            'filterLabels': not_match_labels
1049                        }]
1050                    }],
1051                    'service': original_backend_service.url
1052                },
1053                {
1054                    'priority': 1,
1055                    'matchRules': [{
1056                        'prefixMatch':
1057                            '/',
1058                        'metadataFilters': [{
1059                            'filterMatchCriteria': 'MATCH_ALL',
1060                            'filterLabels': match_labels
1061                        }]
1062                    }],
1063                    'service': alternate_backend_service.url
1064                },
1065            ],
1066            # test mixing MATCH_ALL and MATCH_ANY
1067            # test MATCH_ALL: super set labels won't match
1068            [
1069                {
1070                    'priority': 0,
1071                    'matchRules': [{
1072                        'prefixMatch':
1073                            '/',
1074                        'metadataFilters': [{
1075                            'filterMatchCriteria': 'MATCH_ALL',
1076                            'filterLabels': not_match_labels + match_labels
1077                        }]
1078                    }],
1079                    'service': original_backend_service.url
1080                },
1081                {
1082                    'priority': 1,
1083                    'matchRules': [{
1084                        'prefixMatch':
1085                            '/',
1086                        'metadataFilters': [{
1087                            'filterMatchCriteria': 'MATCH_ANY',
1088                            'filterLabels': not_match_labels + match_labels
1089                        }]
1090                    }],
1091                    'service': alternate_backend_service.url
1092                },
1093            ],
1094            # test MATCH_ANY
1095            [
1096                {
1097                    'priority': 0,
1098                    'matchRules': [{
1099                        'prefixMatch':
1100                            '/',
1101                        'metadataFilters': [{
1102                            'filterMatchCriteria': 'MATCH_ANY',
1103                            'filterLabels': not_match_labels
1104                        }]
1105                    }],
1106                    'service': original_backend_service.url
1107                },
1108                {
1109                    'priority': 1,
1110                    'matchRules': [{
1111                        'prefixMatch':
1112                            '/',
1113                        'metadataFilters': [{
1114                            'filterMatchCriteria': 'MATCH_ANY',
1115                            'filterLabels': not_match_labels + match_labels
1116                        }]
1117                    }],
1118                    'service': alternate_backend_service.url
1119                },
1120            ],
1121            # test match multiple route rules
1122            [
1123                {
1124                    'priority': 0,
1125                    'matchRules': [{
1126                        'prefixMatch':
1127                            '/',
1128                        'metadataFilters': [{
1129                            'filterMatchCriteria': 'MATCH_ANY',
1130                            'filterLabels': match_labels
1131                        }]
1132                    }],
1133                    'service': alternate_backend_service.url
1134                },
1135                {
1136                    'priority': 1,
1137                    'matchRules': [{
1138                        'prefixMatch':
1139                            '/',
1140                        'metadataFilters': [{
1141                            'filterMatchCriteria': 'MATCH_ALL',
1142                            'filterLabels': match_labels
1143                        }]
1144                    }],
1145                    'service': original_backend_service.url
1146                },
1147            ]
1148        ]
1149
1150        for route_rules in test_route_rules:
1151            wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
1152                                                     _WAIT_FOR_STATS_SEC)
1153            patch_url_map_backend_service(gcp,
1154                                          original_backend_service,
1155                                          route_rules=route_rules)
1156            wait_until_no_rpcs_go_to_given_backends(original_backend_instances,
1157                                                    _WAIT_FOR_STATS_SEC)
1158            wait_until_all_rpcs_go_to_given_backends(
1159                alternate_backend_instances, _WAIT_FOR_STATS_SEC)
1160            patch_url_map_backend_service(gcp, original_backend_service)
1161    except Exception:
1162        passed = False
1163        raise
1164    finally:
1165        if passed or not args.halt_after_fail:
1166            patch_backend_service(gcp, alternate_backend_service, [])
1167
1168
1169def test_api_listener(gcp, backend_service, instance_group,
1170                      alternate_backend_service):
1171    logger.info("Running api_listener")
1172    passed = True
1173    try:
1174        wait_for_healthy_backends(gcp, backend_service, instance_group)
1175        backend_instances = get_instance_names(gcp, instance_group)
1176        wait_until_all_rpcs_go_to_given_backends(backend_instances,
1177                                                 _WAIT_FOR_STATS_SEC)
1178        # create a second suite of map+tp+fr with the same host name in host rule
1179        # and we have to disable proxyless validation because it needs `0.0.0.0`
1180        # ip address in fr for proxyless and also we violate ip:port uniqueness
1181        # for test purpose. See https://github.com/grpc/grpc-java/issues/8009
1182        new_config_suffix = '2'
1183        url_map_2 = create_url_map(gcp, url_map_name + new_config_suffix,
1184                                   backend_service, service_host_name)
1185        target_proxy_2 = create_target_proxy(
1186            gcp, target_proxy_name + new_config_suffix, False, url_map_2)
1187        if not gcp.service_port:
1188            raise Exception(
1189                'Faied to find a valid port for the forwarding rule')
1190        potential_ip_addresses = []
1191        max_attempts = 10
1192        for i in range(max_attempts):
1193            potential_ip_addresses.append('10.10.10.%d' %
1194                                          (random.randint(0, 255)))
1195        create_global_forwarding_rule(gcp,
1196                                      forwarding_rule_name + new_config_suffix,
1197                                      [gcp.service_port],
1198                                      potential_ip_addresses, target_proxy_2)
1199        if gcp.service_port != _DEFAULT_SERVICE_PORT:
1200            patch_url_map_host_rule_with_port(gcp,
1201                                              url_map_name + new_config_suffix,
1202                                              backend_service,
1203                                              service_host_name)
1204        wait_until_all_rpcs_go_to_given_backends(backend_instances,
1205                                                 _WAIT_FOR_STATS_SEC)
1206
1207        delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0])
1208        delete_target_proxy(gcp, gcp.target_proxies[0])
1209        delete_url_map(gcp, gcp.url_maps[0])
1210        verify_attempts = int(_WAIT_FOR_URL_MAP_PATCH_SEC / _NUM_TEST_RPCS *
1211                              args.qps)
1212        for i in range(verify_attempts):
1213            wait_until_all_rpcs_go_to_given_backends(backend_instances,
1214                                                     _WAIT_FOR_STATS_SEC)
1215        # delete host rule for the original host name
1216        patch_url_map_backend_service(gcp, alternate_backend_service)
1217        wait_until_no_rpcs_go_to_given_backends(backend_instances,
1218                                                _WAIT_FOR_STATS_SEC)
1219
1220    except Exception:
1221        passed = False
1222        raise
1223    finally:
1224        if passed or not args.halt_after_fail:
1225            delete_global_forwarding_rules(gcp)
1226            delete_target_proxies(gcp)
1227            delete_url_maps(gcp)
1228            create_url_map(gcp, url_map_name, backend_service,
1229                           service_host_name)
1230            create_target_proxy(gcp, target_proxy_name)
1231            create_global_forwarding_rule(gcp, forwarding_rule_name,
1232                                          potential_service_ports)
1233            if gcp.service_port != _DEFAULT_SERVICE_PORT:
1234                patch_url_map_host_rule_with_port(gcp, url_map_name,
1235                                                  backend_service,
1236                                                  service_host_name)
1237                server_uri = service_host_name + ':' + str(gcp.service_port)
1238            else:
1239                server_uri = service_host_name
1240            return server_uri
1241
1242
1243def test_forwarding_rule_port_match(gcp, backend_service, instance_group):
1244    logger.info("Running test_forwarding_rule_port_match")
1245    passed = True
1246    try:
1247        wait_for_healthy_backends(gcp, backend_service, instance_group)
1248        backend_instances = get_instance_names(gcp, instance_group)
1249        wait_until_all_rpcs_go_to_given_backends(backend_instances,
1250                                                 _WAIT_FOR_STATS_SEC)
1251        delete_global_forwarding_rules(gcp)
1252        create_global_forwarding_rule(gcp, forwarding_rule_name, [
1253            x for x in parse_port_range(_DEFAULT_PORT_RANGE)
1254            if x != gcp.service_port
1255        ])
1256        wait_until_no_rpcs_go_to_given_backends(backend_instances,
1257                                                _WAIT_FOR_STATS_SEC)
1258    except Exception:
1259        passed = False
1260        raise
1261    finally:
1262        if passed or not args.halt_after_fail:
1263            delete_global_forwarding_rules(gcp)
1264            create_global_forwarding_rule(gcp, forwarding_rule_name,
1265                                          potential_service_ports)
1266            if gcp.service_port != _DEFAULT_SERVICE_PORT:
1267                patch_url_map_host_rule_with_port(gcp, url_map_name,
1268                                                  backend_service,
1269                                                  service_host_name)
1270                server_uri = service_host_name + ':' + str(gcp.service_port)
1271            else:
1272                server_uri = service_host_name
1273            return server_uri
1274
1275
1276def test_forwarding_rule_default_port(gcp, backend_service, instance_group):
1277    logger.info("Running test_forwarding_rule_default_port")
1278    passed = True
1279    try:
1280        wait_for_healthy_backends(gcp, backend_service, instance_group)
1281        backend_instances = get_instance_names(gcp, instance_group)
1282        if gcp.service_port == _DEFAULT_SERVICE_PORT:
1283            wait_until_all_rpcs_go_to_given_backends(backend_instances,
1284                                                     _WAIT_FOR_STATS_SEC)
1285            delete_global_forwarding_rules(gcp)
1286            create_global_forwarding_rule(gcp, forwarding_rule_name,
1287                                          parse_port_range(_DEFAULT_PORT_RANGE))
1288            patch_url_map_host_rule_with_port(gcp, url_map_name,
1289                                              backend_service,
1290                                              service_host_name)
1291        wait_until_no_rpcs_go_to_given_backends(backend_instances,
1292                                                _WAIT_FOR_STATS_SEC)
1293        # expect success when no port in client request service uri, and no port in url-map
1294        delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0])
1295        delete_target_proxy(gcp, gcp.target_proxies[0])
1296        delete_url_map(gcp, gcp.url_maps[0])
1297        create_url_map(gcp, url_map_name, backend_service, service_host_name)
1298        create_target_proxy(gcp, target_proxy_name, False)
1299        potential_ip_addresses = []
1300        max_attempts = 10
1301        for i in range(max_attempts):
1302            potential_ip_addresses.append('10.10.10.%d' %
1303                                          (random.randint(0, 255)))
1304        create_global_forwarding_rule(gcp, forwarding_rule_name, [80],
1305                                      potential_ip_addresses)
1306        wait_until_all_rpcs_go_to_given_backends(backend_instances,
1307                                                 _WAIT_FOR_STATS_SEC)
1308
1309        # expect failure when no port in client request uri, but specify port in url-map
1310        patch_url_map_host_rule_with_port(gcp, url_map_name, backend_service,
1311                                          service_host_name)
1312        wait_until_no_rpcs_go_to_given_backends(backend_instances,
1313                                                _WAIT_FOR_STATS_SEC)
1314    except Exception:
1315        passed = False
1316        raise
1317    finally:
1318        if passed or not args.halt_after_fail:
1319            delete_global_forwarding_rules(gcp)
1320            delete_target_proxies(gcp)
1321            delete_url_maps(gcp)
1322            create_url_map(gcp, url_map_name, backend_service,
1323                           service_host_name)
1324            create_target_proxy(gcp, target_proxy_name)
1325            create_global_forwarding_rule(gcp, forwarding_rule_name,
1326                                          potential_service_ports)
1327            if gcp.service_port != _DEFAULT_SERVICE_PORT:
1328                patch_url_map_host_rule_with_port(gcp, url_map_name,
1329                                                  backend_service,
1330                                                  service_host_name)
1331                server_uri = service_host_name + ':' + str(gcp.service_port)
1332            else:
1333                server_uri = service_host_name
1334            return server_uri
1335
1336
1337def test_traffic_splitting(gcp, original_backend_service, instance_group,
1338                           alternate_backend_service, same_zone_instance_group):
1339    # This test start with all traffic going to original_backend_service. Then
1340    # it updates URL-map to set default action to traffic splitting between
1341    # original and alternate. It waits for all backends in both services to
1342    # receive traffic, then verifies that weights are expected.
1343    logger.info('Running test_traffic_splitting')
1344
1345    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
1346        gcp, original_backend_service, instance_group,
1347        alternate_backend_service, same_zone_instance_group)
1348
1349    passed = True
1350    try:
1351        # Patch urlmap, change route action to traffic splitting between
1352        # original and alternate.
1353        logger.info('patching url map with traffic splitting')
1354        original_service_percentage, alternate_service_percentage = 20, 80
1355        patch_url_map_backend_service(
1356            gcp,
1357            services_with_weights={
1358                original_backend_service: original_service_percentage,
1359                alternate_backend_service: alternate_service_percentage,
1360            })
1361        # Split percentage between instances: [20,80] -> [10,10,40,40].
1362        expected_instance_percentage = [
1363            original_service_percentage * 1.0 / len(original_backend_instances)
1364        ] * len(original_backend_instances) + [
1365            alternate_service_percentage * 1.0 /
1366            len(alternate_backend_instances)
1367        ] * len(alternate_backend_instances)
1368
1369        # Wait for traffic to go to both services.
1370        logger.info(
1371            'waiting for traffic to go to all backends (including alternate)')
1372        wait_until_all_rpcs_go_to_given_backends(
1373            original_backend_instances + alternate_backend_instances,
1374            _WAIT_FOR_STATS_SEC)
1375
1376        # Verify that weights between two services are expected.
1377        retry_count = 10
1378        # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
1379        # seconds timeout.
1380        for i in range(retry_count):
1381            stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1382            got_instance_count = [
1383                stats.rpcs_by_peer[i] for i in original_backend_instances
1384            ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances]
1385            total_count = sum(got_instance_count)
1386            got_instance_percentage = [
1387                x * 100.0 / total_count for x in got_instance_count
1388            ]
1389
1390            try:
1391                compare_distributions(got_instance_percentage,
1392                                      expected_instance_percentage, 5)
1393            except Exception as e:
1394                logger.info('attempt %d', i)
1395                logger.info('got percentage: %s', got_instance_percentage)
1396                logger.info('expected percentage: %s',
1397                            expected_instance_percentage)
1398                logger.info(e)
1399                if i == retry_count - 1:
1400                    raise Exception(
1401                        'RPC distribution (%s) differs from expected (%s)' %
1402                        (got_instance_percentage, expected_instance_percentage))
1403            else:
1404                logger.info("success")
1405                break
1406    except Exception:
1407        passed = False
1408        raise
1409    finally:
1410        if passed or not args.halt_after_fail:
1411            patch_url_map_backend_service(gcp, original_backend_service)
1412            patch_backend_service(gcp, alternate_backend_service, [])
1413
1414
1415def test_path_matching(gcp, original_backend_service, instance_group,
1416                       alternate_backend_service, same_zone_instance_group):
1417    # This test start with all traffic (UnaryCall and EmptyCall) going to
1418    # original_backend_service.
1419    #
1420    # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to
1421    # go different backends. It waits for all backends in both services to
1422    # receive traffic, then verifies that traffic goes to the expected
1423    # backends.
1424    logger.info('Running test_path_matching')
1425
1426    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
1427        gcp, original_backend_service, instance_group,
1428        alternate_backend_service, same_zone_instance_group)
1429
1430    passed = True
1431    try:
1432        # A list of tuples (route_rules, expected_instances).
1433        test_cases = [
1434            (
1435                [{
1436                    'priority': 0,
1437                    # FullPath EmptyCall -> alternate_backend_service.
1438                    'matchRules': [{
1439                        'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
1440                    }],
1441                    'service': alternate_backend_service.url
1442                }],
1443                {
1444                    "EmptyCall": alternate_backend_instances,
1445                    "UnaryCall": original_backend_instances
1446                }),
1447            (
1448                [{
1449                    'priority': 0,
1450                    # Prefix UnaryCall -> alternate_backend_service.
1451                    'matchRules': [{
1452                        'prefixMatch': '/grpc.testing.TestService/Unary'
1453                    }],
1454                    'service': alternate_backend_service.url
1455                }],
1456                {
1457                    "UnaryCall": alternate_backend_instances,
1458                    "EmptyCall": original_backend_instances
1459                }),
1460            (
1461                # This test case is similar to the one above (but with route
1462                # services swapped). This test has two routes (full_path and
1463                # the default) to match EmptyCall, and both routes set
1464                # alternative_backend_service as the action. This forces the
1465                # client to handle duplicate Clusters in the RDS response.
1466                [
1467                    {
1468                        'priority': 0,
1469                        # Prefix UnaryCall -> original_backend_service.
1470                        'matchRules': [{
1471                            'prefixMatch': '/grpc.testing.TestService/Unary'
1472                        }],
1473                        'service': original_backend_service.url
1474                    },
1475                    {
1476                        'priority': 1,
1477                        # FullPath EmptyCall -> alternate_backend_service.
1478                        'matchRules': [{
1479                            'fullPathMatch':
1480                                '/grpc.testing.TestService/EmptyCall'
1481                        }],
1482                        'service': alternate_backend_service.url
1483                    }
1484                ],
1485                {
1486                    "UnaryCall": original_backend_instances,
1487                    "EmptyCall": alternate_backend_instances
1488                }),
1489            (
1490                [{
1491                    'priority': 0,
1492                    # Regex UnaryCall -> alternate_backend_service.
1493                    'matchRules': [{
1494                        'regexMatch':
1495                            '^\/.*\/UnaryCall$'  # Unary methods with any services.
1496                    }],
1497                    'service': alternate_backend_service.url
1498                }],
1499                {
1500                    "UnaryCall": alternate_backend_instances,
1501                    "EmptyCall": original_backend_instances
1502                }),
1503            (
1504                [{
1505                    'priority': 0,
1506                    # ignoreCase EmptyCall -> alternate_backend_service.
1507                    'matchRules': [{
1508                        # Case insensitive matching.
1509                        'fullPathMatch': '/gRpC.tEsTinG.tEstseRvice/empTycaLl',
1510                        'ignoreCase': True,
1511                    }],
1512                    'service': alternate_backend_service.url
1513                }],
1514                {
1515                    "UnaryCall": original_backend_instances,
1516                    "EmptyCall": alternate_backend_instances
1517                }),
1518        ]
1519
1520        for (route_rules, expected_instances) in test_cases:
1521            logger.info('patching url map with %s', route_rules)
1522            patch_url_map_backend_service(gcp,
1523                                          original_backend_service,
1524                                          route_rules=route_rules)
1525
1526            # Wait for traffic to go to both services.
1527            logger.info(
1528                'waiting for traffic to go to all backends (including alternate)'
1529            )
1530            wait_until_all_rpcs_go_to_given_backends(
1531                original_backend_instances + alternate_backend_instances,
1532                _WAIT_FOR_STATS_SEC)
1533
1534            retry_count = 80
1535            # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
1536            # seconds timeout.
1537            for i in range(retry_count):
1538                stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1539                if not stats.rpcs_by_method:
1540                    raise ValueError(
1541                        'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
1542                    )
1543                logger.info('attempt %d', i)
1544                if compare_expected_instances(stats, expected_instances):
1545                    logger.info("success")
1546                    break
1547                elif i == retry_count - 1:
1548                    raise Exception(
1549                        'timeout waiting for RPCs to the expected instances: %s'
1550                        % expected_instances)
1551    except Exception:
1552        passed = False
1553        raise
1554    finally:
1555        if passed or not args.halt_after_fail:
1556            patch_url_map_backend_service(gcp, original_backend_service)
1557            patch_backend_service(gcp, alternate_backend_service, [])
1558
1559
1560def test_header_matching(gcp, original_backend_service, instance_group,
1561                         alternate_backend_service, same_zone_instance_group):
1562    # This test start with all traffic (UnaryCall and EmptyCall) going to
1563    # original_backend_service.
1564    #
1565    # Then it updates URL-map to add routes, to make RPCs with test headers to
1566    # go to different backends. It waits for all backends in both services to
1567    # receive traffic, then verifies that traffic goes to the expected
1568    # backends.
1569    logger.info('Running test_header_matching')
1570
1571    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
1572        gcp, original_backend_service, instance_group,
1573        alternate_backend_service, same_zone_instance_group)
1574
1575    passed = True
1576    try:
1577        # A list of tuples (route_rules, expected_instances).
1578        test_cases = [
1579            (
1580                [{
1581                    'priority': 0,
1582                    # Header ExactMatch -> alternate_backend_service.
1583                    # EmptyCall is sent with the metadata.
1584                    'matchRules': [{
1585                        'prefixMatch':
1586                            '/',
1587                        'headerMatches': [{
1588                            'headerName': _TEST_METADATA_KEY,
1589                            'exactMatch': _TEST_METADATA_VALUE_EMPTY
1590                        }]
1591                    }],
1592                    'service': alternate_backend_service.url
1593                }],
1594                {
1595                    "EmptyCall": alternate_backend_instances,
1596                    "UnaryCall": original_backend_instances
1597                }),
1598            (
1599                [{
1600                    'priority': 0,
1601                    # Header PrefixMatch -> alternate_backend_service.
1602                    # UnaryCall is sent with the metadata.
1603                    'matchRules': [{
1604                        'prefixMatch':
1605                            '/',
1606                        'headerMatches': [{
1607                            'headerName': _TEST_METADATA_KEY,
1608                            'prefixMatch': _TEST_METADATA_VALUE_UNARY[:2]
1609                        }]
1610                    }],
1611                    'service': alternate_backend_service.url
1612                }],
1613                {
1614                    "EmptyCall": original_backend_instances,
1615                    "UnaryCall": alternate_backend_instances
1616                }),
1617            (
1618                [{
1619                    'priority': 0,
1620                    # Header SuffixMatch -> alternate_backend_service.
1621                    # EmptyCall is sent with the metadata.
1622                    'matchRules': [{
1623                        'prefixMatch':
1624                            '/',
1625                        'headerMatches': [{
1626                            'headerName': _TEST_METADATA_KEY,
1627                            'suffixMatch': _TEST_METADATA_VALUE_EMPTY[-2:]
1628                        }]
1629                    }],
1630                    'service': alternate_backend_service.url
1631                }],
1632                {
1633                    "EmptyCall": alternate_backend_instances,
1634                    "UnaryCall": original_backend_instances
1635                }),
1636            (
1637                [{
1638                    'priority': 0,
1639                    # Header 'xds_md_numeric' present -> alternate_backend_service.
1640                    # UnaryCall is sent with the metadata, so will be sent to alternative.
1641                    'matchRules': [{
1642                        'prefixMatch':
1643                            '/',
1644                        'headerMatches': [{
1645                            'headerName': _TEST_METADATA_NUMERIC_KEY,
1646                            'presentMatch': True
1647                        }]
1648                    }],
1649                    'service': alternate_backend_service.url
1650                }],
1651                {
1652                    "EmptyCall": original_backend_instances,
1653                    "UnaryCall": alternate_backend_instances
1654                }),
1655            (
1656                [{
1657                    'priority': 0,
1658                    # Header invert ExactMatch -> alternate_backend_service.
1659                    # UnaryCall is sent with the metadata, so will be sent to
1660                    # original. EmptyCall will be sent to alternative.
1661                    'matchRules': [{
1662                        'prefixMatch':
1663                            '/',
1664                        'headerMatches': [{
1665                            'headerName': _TEST_METADATA_KEY,
1666                            'exactMatch': _TEST_METADATA_VALUE_UNARY,
1667                            'invertMatch': True
1668                        }]
1669                    }],
1670                    'service': alternate_backend_service.url
1671                }],
1672                {
1673                    "EmptyCall": alternate_backend_instances,
1674                    "UnaryCall": original_backend_instances
1675                }),
1676            (
1677                [{
1678                    'priority': 0,
1679                    # Header 'xds_md_numeric' range [100,200] -> alternate_backend_service.
1680                    # UnaryCall is sent with the metadata in range.
1681                    'matchRules': [{
1682                        'prefixMatch':
1683                            '/',
1684                        'headerMatches': [{
1685                            'headerName': _TEST_METADATA_NUMERIC_KEY,
1686                            'rangeMatch': {
1687                                'rangeStart': '100',
1688                                'rangeEnd': '200'
1689                            }
1690                        }]
1691                    }],
1692                    'service': alternate_backend_service.url
1693                }],
1694                {
1695                    "EmptyCall": original_backend_instances,
1696                    "UnaryCall": alternate_backend_instances
1697                }),
1698            (
1699                [{
1700                    'priority': 0,
1701                    # Header RegexMatch -> alternate_backend_service.
1702                    # EmptyCall is sent with the metadata.
1703                    'matchRules': [{
1704                        'prefixMatch':
1705                            '/',
1706                        'headerMatches': [{
1707                            'headerName':
1708                                _TEST_METADATA_KEY,
1709                            'regexMatch':
1710                                "^%s.*%s$" % (_TEST_METADATA_VALUE_EMPTY[:2],
1711                                              _TEST_METADATA_VALUE_EMPTY[-2:])
1712                        }]
1713                    }],
1714                    'service': alternate_backend_service.url
1715                }],
1716                {
1717                    "EmptyCall": alternate_backend_instances,
1718                    "UnaryCall": original_backend_instances
1719                }),
1720        ]
1721
1722        for (route_rules, expected_instances) in test_cases:
1723            logger.info('patching url map with %s -> alternative',
1724                        route_rules[0]['matchRules'])
1725            patch_url_map_backend_service(gcp,
1726                                          original_backend_service,
1727                                          route_rules=route_rules)
1728
1729            # Wait for traffic to go to both services.
1730            logger.info(
1731                'waiting for traffic to go to all backends (including alternate)'
1732            )
1733            wait_until_all_rpcs_go_to_given_backends(
1734                original_backend_instances + alternate_backend_instances,
1735                _WAIT_FOR_STATS_SEC)
1736
1737            retry_count = 80
1738            # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
1739            # seconds timeout.
1740            for i in range(retry_count):
1741                stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1742                if not stats.rpcs_by_method:
1743                    raise ValueError(
1744                        'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
1745                    )
1746                logger.info('attempt %d', i)
1747                if compare_expected_instances(stats, expected_instances):
1748                    logger.info("success")
1749                    break
1750                elif i == retry_count - 1:
1751                    raise Exception(
1752                        'timeout waiting for RPCs to the expected instances: %s'
1753                        % expected_instances)
1754    except Exception:
1755        passed = False
1756        raise
1757    finally:
1758        if passed or not args.halt_after_fail:
1759            patch_url_map_backend_service(gcp, original_backend_service)
1760            patch_backend_service(gcp, alternate_backend_service, [])
1761
1762
1763def test_circuit_breaking(gcp, original_backend_service, instance_group,
1764                          same_zone_instance_group):
1765    '''
1766    Since backend service circuit_breakers configuration cannot be unset,
1767    which causes trouble for restoring validate_for_proxy flag in target
1768    proxy/global forwarding rule. This test uses dedicated backend sevices.
1769    The url_map and backend services undergoes the following state changes:
1770
1771    Before test:
1772       original_backend_service -> [instance_group]
1773       extra_backend_service -> []
1774       more_extra_backend_service -> []
1775
1776       url_map -> [original_backend_service]
1777
1778    In test:
1779       extra_backend_service (with circuit_breakers) -> [instance_group]
1780       more_extra_backend_service (with circuit_breakers) -> [same_zone_instance_group]
1781
1782       url_map -> [extra_backend_service, more_extra_backend_service]
1783
1784    After test:
1785       original_backend_service -> [instance_group]
1786       extra_backend_service (with circuit_breakers) -> []
1787       more_extra_backend_service (with circuit_breakers) -> []
1788
1789       url_map -> [original_backend_service]
1790    '''
1791    logger.info('Running test_circuit_breaking')
1792    additional_backend_services = []
1793    passed = True
1794    try:
1795        # TODO(chengyuanzhang): Dedicated backend services created for circuit
1796        # breaking test. Once the issue for unsetting backend service circuit
1797        # breakers is resolved or configuring backend service circuit breakers is
1798        # enabled for config validation, these dedicated backend services can be
1799        # eliminated.
1800        extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix
1801        more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix
1802        extra_backend_service = add_backend_service(gcp,
1803                                                    extra_backend_service_name)
1804        additional_backend_services.append(extra_backend_service)
1805        more_extra_backend_service = add_backend_service(
1806            gcp, more_extra_backend_service_name)
1807        additional_backend_services.append(more_extra_backend_service)
1808        # The config validation for proxyless doesn't allow setting
1809        # circuit_breakers. Disable validate validate_for_proxyless
1810        # for this test. This can be removed when validation
1811        # accepts circuit_breakers.
1812        logger.info('disabling validate_for_proxyless in target proxy')
1813        set_validate_for_proxyless(gcp, False)
1814        extra_backend_service_max_requests = 500
1815        more_extra_backend_service_max_requests = 1000
1816        patch_backend_service(gcp,
1817                              extra_backend_service, [instance_group],
1818                              circuit_breakers={
1819                                  'maxRequests':
1820                                      extra_backend_service_max_requests
1821                              })
1822        logger.info('Waiting for extra backends to become healthy')
1823        wait_for_healthy_backends(gcp, extra_backend_service, instance_group)
1824        patch_backend_service(gcp,
1825                              more_extra_backend_service,
1826                              [same_zone_instance_group],
1827                              circuit_breakers={
1828                                  'maxRequests':
1829                                      more_extra_backend_service_max_requests
1830                              })
1831        logger.info('Waiting for more extra backend to become healthy')
1832        wait_for_healthy_backends(gcp, more_extra_backend_service,
1833                                  same_zone_instance_group)
1834        extra_backend_instances = get_instance_names(gcp, instance_group)
1835        more_extra_backend_instances = get_instance_names(
1836            gcp, same_zone_instance_group)
1837        route_rules = [
1838            {
1839                'priority': 0,
1840                # UnaryCall -> extra_backend_service
1841                'matchRules': [{
1842                    'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
1843                }],
1844                'service': extra_backend_service.url
1845            },
1846            {
1847                'priority': 1,
1848                # EmptyCall -> more_extra_backend_service
1849                'matchRules': [{
1850                    'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
1851                }],
1852                'service': more_extra_backend_service.url
1853            },
1854        ]
1855
1856        # Make client send UNARY_CALL and EMPTY_CALL.
1857        configure_client([
1858            messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1859            messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
1860        ])
1861        logger.info('Patching url map with %s', route_rules)
1862        patch_url_map_backend_service(gcp,
1863                                      extra_backend_service,
1864                                      route_rules=route_rules)
1865        logger.info('Waiting for traffic to go to all backends')
1866        wait_until_all_rpcs_go_to_given_backends(
1867            extra_backend_instances + more_extra_backend_instances,
1868            _WAIT_FOR_STATS_SEC)
1869
1870        # Make all calls keep-open.
1871        configure_client([
1872            messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1873            messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
1874        ], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1875             'rpc-behavior', 'keep-open'),
1876            (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1877             'rpc-behavior', 'keep-open')])
1878        wait_until_rpcs_in_flight(
1879            'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
1880                           int(extra_backend_service_max_requests / args.qps)),
1881            extra_backend_service_max_requests, 1)
1882        logger.info('UNARY_CALL reached stable state (%d)',
1883                    extra_backend_service_max_requests)
1884        wait_until_rpcs_in_flight(
1885            'EMPTY_CALL',
1886            (_WAIT_FOR_BACKEND_SEC +
1887             int(more_extra_backend_service_max_requests / args.qps)),
1888            more_extra_backend_service_max_requests, 1)
1889        logger.info('EMPTY_CALL reached stable state (%d)',
1890                    more_extra_backend_service_max_requests)
1891
1892        # Increment circuit breakers max_requests threshold.
1893        extra_backend_service_max_requests = 800
1894        patch_backend_service(gcp,
1895                              extra_backend_service, [instance_group],
1896                              circuit_breakers={
1897                                  'maxRequests':
1898                                      extra_backend_service_max_requests
1899                              })
1900        wait_until_rpcs_in_flight(
1901            'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
1902                           int(extra_backend_service_max_requests / args.qps)),
1903            extra_backend_service_max_requests, 1)
1904        logger.info('UNARY_CALL reached stable state after increase (%d)',
1905                    extra_backend_service_max_requests)
1906        logger.info('success')
1907        # Avoid new RPCs being outstanding (some test clients create threads
1908        # for sending RPCs) after restoring backend services.
1909        configure_client(
1910            [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL])
1911    except Exception:
1912        passed = False
1913        raise
1914    finally:
1915        if passed or not args.halt_after_fail:
1916            patch_url_map_backend_service(gcp, original_backend_service)
1917            patch_backend_service(gcp, original_backend_service,
1918                                  [instance_group])
1919            for backend_service in additional_backend_services:
1920                delete_backend_service(gcp, backend_service)
1921            set_validate_for_proxyless(gcp, True)
1922
1923
1924def test_timeout(gcp, original_backend_service, instance_group):
1925    logger.info('Running test_timeout')
1926
1927    logger.info('waiting for original backends to become healthy')
1928    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
1929
1930    # UnaryCall -> maxStreamDuration:3s
1931    route_rules = [{
1932        'priority': 0,
1933        'matchRules': [{
1934            'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
1935        }],
1936        'service': original_backend_service.url,
1937        'routeAction': {
1938            'maxStreamDuration': {
1939                'seconds': 3,
1940            },
1941        },
1942    }]
1943    patch_url_map_backend_service(gcp,
1944                                  original_backend_service,
1945                                  route_rules=route_rules)
1946    # A list of tuples (testcase_name, {client_config}, {expected_results})
1947    test_cases = [
1948        (
1949            'timeout_exceeded (UNARY_CALL), timeout_different_route (EMPTY_CALL)',
1950            # UnaryCall and EmptyCall both sleep-4.
1951            # UnaryCall timeouts, EmptyCall succeeds.
1952            {
1953                'rpc_types': [
1954                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1955                    messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1956                ],
1957                'metadata': [
1958                    (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1959                     'rpc-behavior', 'sleep-4'),
1960                    (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1961                     'rpc-behavior', 'sleep-4'),
1962                ],
1963            },
1964            {
1965                'UNARY_CALL': 4,  # DEADLINE_EXCEEDED
1966                'EMPTY_CALL': 0,
1967            },
1968        ),
1969        (
1970            'app_timeout_exceeded',
1971            # UnaryCall only with sleep-2; timeout=1s; calls timeout.
1972            {
1973                'rpc_types': [
1974                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1975                ],
1976                'metadata': [
1977                    (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1978                     'rpc-behavior', 'sleep-2'),
1979                ],
1980                'timeout_sec': 1,
1981            },
1982            {
1983                'UNARY_CALL': 4,  # DEADLINE_EXCEEDED
1984            },
1985        ),
1986        (
1987            'timeout_not_exceeded',
1988            # UnaryCall only with no sleep; calls succeed.
1989            {
1990                'rpc_types': [
1991                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1992                ],
1993            },
1994            {
1995                'UNARY_CALL': 0,
1996            },
1997        )
1998    ]
1999
2000    passed = True
2001    try:
2002        first_case = True
2003        for (testcase_name, client_config, expected_results) in test_cases:
2004            logger.info('starting case %s', testcase_name)
2005            configure_client(**client_config)
2006            # wait a second to help ensure the client stops sending RPCs with
2007            # the old config.  We will make multiple attempts if it is failing,
2008            # but this improves confidence that the test is valid if the
2009            # previous client_config would lead to the same results.
2010            time.sleep(1)
2011            # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
2012            # second timeout.
2013            attempt_count = 20
2014            if first_case:
2015                attempt_count = 120
2016                first_case = False
2017            before_stats = get_client_accumulated_stats()
2018            if not before_stats.stats_per_method:
2019                raise ValueError(
2020                    'stats.stats_per_method is None, the interop client stats service does not support this test case'
2021                )
2022            for i in range(attempt_count):
2023                logger.info('%s: attempt %d', testcase_name, i)
2024
2025                test_runtime_secs = 10
2026                time.sleep(test_runtime_secs)
2027                after_stats = get_client_accumulated_stats()
2028
2029                success = True
2030                for rpc, status in expected_results.items():
2031                    qty = (after_stats.stats_per_method[rpc].result[status] -
2032                           before_stats.stats_per_method[rpc].result[status])
2033                    want = test_runtime_secs * args.qps
2034                    # Allow 10% deviation from expectation to reduce flakiness
2035                    if qty < (want * .9) or qty > (want * 1.1):
2036                        logger.info('%s: failed due to %s[%s]: got %d want ~%d',
2037                                    testcase_name, rpc, status, qty, want)
2038                        success = False
2039                if success:
2040                    logger.info('success')
2041                    break
2042                logger.info('%s attempt %d failed', testcase_name, i)
2043                before_stats = after_stats
2044            else:
2045                raise Exception(
2046                    '%s: timeout waiting for expected results: %s; got %s' %
2047                    (testcase_name, expected_results,
2048                     after_stats.stats_per_method))
2049    except Exception:
2050        passed = False
2051        raise
2052    finally:
2053        if passed or not args.halt_after_fail:
2054            patch_url_map_backend_service(gcp, original_backend_service)
2055
2056
2057def test_fault_injection(gcp, original_backend_service, instance_group):
2058    logger.info('Running test_fault_injection')
2059
2060    logger.info('waiting for original backends to become healthy')
2061    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
2062
2063    testcase_header = 'fi_testcase'
2064
2065    def _route(pri, name, fi_policy):
2066        return {
2067            'priority': pri,
2068            'matchRules': [{
2069                'prefixMatch':
2070                    '/',
2071                'headerMatches': [{
2072                    'headerName': testcase_header,
2073                    'exactMatch': name,
2074                }],
2075            }],
2076            'service': original_backend_service.url,
2077            'routeAction': {
2078                'faultInjectionPolicy': fi_policy
2079            },
2080        }
2081
2082    def _abort(pct):
2083        return {
2084            'abort': {
2085                'httpStatus': 401,
2086                'percentage': pct,
2087            }
2088        }
2089
2090    def _delay(pct):
2091        return {
2092            'delay': {
2093                'fixedDelay': {
2094                    'seconds': '20'
2095                },
2096                'percentage': pct,
2097            }
2098        }
2099
2100    zero_route = _abort(0)
2101    zero_route.update(_delay(0))
2102    route_rules = [
2103        _route(0, 'zero_percent_fault_injection', zero_route),
2104        _route(1, 'always_delay', _delay(100)),
2105        _route(2, 'always_abort', _abort(100)),
2106        _route(3, 'delay_half', _delay(50)),
2107        _route(4, 'abort_half', _abort(50)),
2108        {
2109            'priority': 5,
2110            'matchRules': [{
2111                'prefixMatch': '/'
2112            }],
2113            'service': original_backend_service.url,
2114        },
2115    ]
2116    set_validate_for_proxyless(gcp, False)
2117    patch_url_map_backend_service(gcp,
2118                                  original_backend_service,
2119                                  route_rules=route_rules)
2120    # A list of tuples (testcase_name, {client_config}, {code: percent}).  Each
2121    # test case will set the testcase_header with the testcase_name for routing
2122    # to the appropriate config for the case, defined above.
2123    test_cases = [
2124        (
2125            'zero_percent_fault_injection',
2126            {},
2127            {
2128                0: 1
2129            },  # OK
2130        ),
2131        (
2132            'non_matching_fault_injection',  # Not in route_rules, above.
2133            {},
2134            {
2135                0: 1
2136            },  # OK
2137        ),
2138        (
2139            'always_delay',
2140            {
2141                'timeout_sec': 2
2142            },
2143            {
2144                4: 1
2145            },  # DEADLINE_EXCEEDED
2146        ),
2147        (
2148            'always_abort',
2149            {},
2150            {
2151                16: 1
2152            },  # UNAUTHENTICATED
2153        ),
2154        (
2155            'delay_half',
2156            {
2157                'timeout_sec': 2
2158            },
2159            {
2160                4: .5,
2161                0: .5
2162            },  # DEADLINE_EXCEEDED / OK: 50% / 50%
2163        ),
2164        (
2165            'abort_half',
2166            {},
2167            {
2168                16: .5,
2169                0: .5
2170            },  # UNAUTHENTICATED / OK: 50% / 50%
2171        )
2172    ]
2173
2174    passed = True
2175    try:
2176        first_case = True
2177        for (testcase_name, client_config, expected_results) in test_cases:
2178            logger.info('starting case %s', testcase_name)
2179
2180            client_config['metadata'] = [
2181                (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2182                 testcase_header, testcase_name)
2183            ]
2184            client_config['rpc_types'] = [
2185                messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2186            ]
2187            configure_client(**client_config)
2188            # wait a second to help ensure the client stops sending RPCs with
2189            # the old config.  We will make multiple attempts if it is failing,
2190            # but this improves confidence that the test is valid if the
2191            # previous client_config would lead to the same results.
2192            time.sleep(1)
2193            # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
2194            # second timeout.
2195            attempt_count = 20
2196            if first_case:
2197                attempt_count = 120
2198                first_case = False
2199            before_stats = get_client_accumulated_stats()
2200            if not before_stats.stats_per_method:
2201                raise ValueError(
2202                    'stats.stats_per_method is None, the interop client stats service does not support this test case'
2203                )
2204            for i in range(attempt_count):
2205                logger.info('%s: attempt %d', testcase_name, i)
2206
2207                test_runtime_secs = 10
2208                time.sleep(test_runtime_secs)
2209                after_stats = get_client_accumulated_stats()
2210
2211                success = True
2212                for status, pct in expected_results.items():
2213                    rpc = 'UNARY_CALL'
2214                    qty = (after_stats.stats_per_method[rpc].result[status] -
2215                           before_stats.stats_per_method[rpc].result[status])
2216                    want = pct * args.qps * test_runtime_secs
2217                    # Allow 10% deviation from expectation to reduce flakiness
2218                    VARIANCE_ALLOWED = 0.1
2219                    if abs(qty - want) > want * VARIANCE_ALLOWED:
2220                        logger.info('%s: failed due to %s[%s]: got %d want ~%d',
2221                                    testcase_name, rpc, status, qty, want)
2222                        success = False
2223                if success:
2224                    logger.info('success')
2225                    break
2226                logger.info('%s attempt %d failed', testcase_name, i)
2227                before_stats = after_stats
2228            else:
2229                raise Exception(
2230                    '%s: timeout waiting for expected results: %s; got %s' %
2231                    (testcase_name, expected_results,
2232                     after_stats.stats_per_method))
2233    except Exception:
2234        passed = False
2235        raise
2236    finally:
2237        if passed or not args.halt_after_fail:
2238            patch_url_map_backend_service(gcp, original_backend_service)
2239            set_validate_for_proxyless(gcp, True)
2240
2241
2242def test_csds(gcp, original_backend_service, instance_group, server_uri):
2243    test_csds_timeout_s = datetime.timedelta(minutes=5).total_seconds()
2244    sleep_interval_between_attempts_s = datetime.timedelta(
2245        seconds=2).total_seconds()
2246    logger.info('Running test_csds')
2247
2248    logger.info('waiting for original backends to become healthy')
2249    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
2250
2251    # Test case timeout: 5 minutes
2252    deadline = time.time() + test_csds_timeout_s
2253    cnt = 0
2254    while time.time() <= deadline:
2255        client_config = get_client_xds_config_dump()
2256        logger.info('test_csds attempt %d: received xDS config %s', cnt,
2257                    json.dumps(client_config, indent=2))
2258        if client_config is not None:
2259            # Got the xDS config dump, now validate it
2260            ok = True
2261            try:
2262                if client_config['node']['locality']['zone'] != args.zone:
2263                    logger.info('Invalid zone %s != %s',
2264                                client_config['node']['locality']['zone'],
2265                                args.zone)
2266                    ok = False
2267                seen = set()
2268                for xds_config in client_config.get('xds_config', []):
2269                    if 'listener_config' in xds_config:
2270                        listener_name = xds_config['listener_config'][
2271                            'dynamic_listeners'][0]['active_state']['listener'][
2272                                'name']
2273                        if listener_name != server_uri:
2274                            logger.info('Invalid Listener name %s != %s',
2275                                        listener_name, server_uri)
2276                            ok = False
2277                        else:
2278                            seen.add('lds')
2279                    elif 'route_config' in xds_config:
2280                        num_vh = len(
2281                            xds_config['route_config']['dynamic_route_configs']
2282                            [0]['route_config']['virtual_hosts'])
2283                        if num_vh <= 0:
2284                            logger.info('Invalid number of VirtualHosts %s',
2285                                        num_vh)
2286                            ok = False
2287                        else:
2288                            seen.add('rds')
2289                    elif 'cluster_config' in xds_config:
2290                        cluster_type = xds_config['cluster_config'][
2291                            'dynamic_active_clusters'][0]['cluster']['type']
2292                        if cluster_type != 'EDS':
2293                            logger.info('Invalid cluster type %s != EDS',
2294                                        cluster_type)
2295                            ok = False
2296                        else:
2297                            seen.add('cds')
2298                    elif 'endpoint_config' in xds_config:
2299                        sub_zone = xds_config["endpoint_config"][
2300                            "dynamic_endpoint_configs"][0]["endpoint_config"][
2301                                "endpoints"][0]["locality"]["sub_zone"]
2302                        if args.zone not in sub_zone:
2303                            logger.info('Invalid endpoint sub_zone %s',
2304                                        sub_zone)
2305                            ok = False
2306                        else:
2307                            seen.add('eds')
2308                for generic_xds_config in client_config.get(
2309                        'generic_xds_configs', []):
2310                    if re.search(r'\.Listener$',
2311                                 generic_xds_config['type_url']):
2312                        seen.add('lds')
2313                        listener = generic_xds_config["xds_config"]
2314                        if listener['name'] != server_uri:
2315                            logger.info('Invalid Listener name %s != %s',
2316                                        listener_name, server_uri)
2317                            ok = False
2318                    elif re.search(r'\.RouteConfiguration$',
2319                                   generic_xds_config['type_url']):
2320                        seen.add('rds')
2321                        route_config = generic_xds_config["xds_config"]
2322                        if not len(route_config['virtual_hosts']):
2323                            logger.info('Invalid number of VirtualHosts %s',
2324                                        num_vh)
2325                            ok = False
2326                    elif re.search(r'\.Cluster$',
2327                                   generic_xds_config['type_url']):
2328                        seen.add('cds')
2329                        cluster = generic_xds_config["xds_config"]
2330                        if cluster['type'] != 'EDS':
2331                            logger.info('Invalid cluster type %s != EDS',
2332                                        cluster_type)
2333                            ok = False
2334                    elif re.search(r'\.ClusterLoadAssignment$',
2335                                   generic_xds_config['type_url']):
2336                        seen.add('eds')
2337                        endpoint = generic_xds_config["xds_config"]
2338                        if args.zone not in endpoint["endpoints"][0][
2339                                "locality"]["sub_zone"]:
2340                            logger.info('Invalid endpoint sub_zone %s',
2341                                        sub_zone)
2342                            ok = False
2343                want = {'lds', 'rds', 'cds', 'eds'}
2344                if seen != want:
2345                    logger.info('Incomplete xDS config dump, seen=%s', seen)
2346                    ok = False
2347            except:
2348                logger.exception('Error in xDS config dump:')
2349                ok = False
2350            finally:
2351                if ok:
2352                    # Successfully fetched xDS config, and they looks good.
2353                    logger.info('success')
2354                    return
2355        logger.info('test_csds attempt %d failed', cnt)
2356        # Give the client some time to fetch xDS resources
2357        time.sleep(sleep_interval_between_attempts_s)
2358        cnt += 1
2359
2360    raise RuntimeError('failed to receive a valid xDS config in %s seconds' %
2361                       test_csds_timeout_s)
2362
2363
2364def set_validate_for_proxyless(gcp, validate_for_proxyless):
2365    if not gcp.alpha_compute:
2366        logger.debug(
2367            'Not setting validateForProxy because alpha is not enabled')
2368        return
2369    if len(gcp.global_forwarding_rules) != 1 or len(
2370            gcp.target_proxies) != 1 or len(gcp.url_maps) != 1:
2371        logger.debug(
2372            "Global forwarding rule, target proxy or url map not found.")
2373        return
2374    # This function deletes global_forwarding_rule and target_proxy, then
2375    # recreate target_proxy with validateForProxyless=False. This is necessary
2376    # because patching target_grpc_proxy isn't supported.
2377    delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0])
2378    delete_target_proxy(gcp, gcp.target_proxies[0])
2379    create_target_proxy(gcp, target_proxy_name, validate_for_proxyless)
2380    create_global_forwarding_rule(gcp, forwarding_rule_name, [gcp.service_port])
2381
2382
2383def get_serving_status(instance, service_port):
2384    with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel:
2385        health_stub = health_pb2_grpc.HealthStub(channel)
2386        return health_stub.Check(health_pb2.HealthCheckRequest())
2387
2388
2389def set_serving_status(instances, service_port, serving):
2390    logger.info('setting %s serving status to %s', instances, serving)
2391    for instance in instances:
2392        with grpc.insecure_channel('%s:%d' %
2393                                   (instance, service_port)) as channel:
2394            logger.info('setting %s serving status to %s', instance, serving)
2395            stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel)
2396            retry_count = 5
2397            for i in range(5):
2398                if serving:
2399                    stub.SetServing(empty_pb2.Empty())
2400                else:
2401                    stub.SetNotServing(empty_pb2.Empty())
2402                serving_status = get_serving_status(instance, service_port)
2403                logger.info('got instance service status %s', serving_status)
2404                want_status = health_pb2.HealthCheckResponse.SERVING if serving else health_pb2.HealthCheckResponse.NOT_SERVING
2405                if serving_status.status == want_status:
2406                    break
2407                if i == retry_count - 1:
2408                    raise Exception(
2409                        'failed to set instance service status after %d retries'
2410                        % retry_count)
2411
2412
2413def is_primary_instance_group(gcp, instance_group):
2414    # Clients may connect to a TD instance in a different region than the
2415    # client, in which case primary/secondary assignments may not be based on
2416    # the client's actual locality.
2417    instance_names = get_instance_names(gcp, instance_group)
2418    stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
2419    return all(peer in instance_names for peer in stats.rpcs_by_peer.keys())
2420
2421
2422def get_startup_script(path_to_server_binary, service_port):
2423    if path_to_server_binary:
2424        return 'nohup %s --port=%d 1>/dev/null &' % (path_to_server_binary,
2425                                                     service_port)
2426    else:
2427        return """#!/bin/bash
2428sudo apt update
2429sudo apt install -y git default-jdk
2430mkdir java_server
2431pushd java_server
2432git clone https://github.com/grpc/grpc-java.git
2433pushd grpc-java
2434pushd interop-testing
2435../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true
2436
2437nohup build/install/grpc-interop-testing/bin/xds-test-server \
2438    --port=%d 1>/dev/null &""" % service_port
2439
2440
2441def create_instance_template(gcp, name, network, source_image, machine_type,
2442                             startup_script):
2443    config = {
2444        'name': name,
2445        'properties': {
2446            'tags': {
2447                'items': ['allow-health-checks']
2448            },
2449            'machineType': machine_type,
2450            'serviceAccounts': [{
2451                'email': 'default',
2452                'scopes': ['https://www.googleapis.com/auth/cloud-platform',]
2453            }],
2454            'networkInterfaces': [{
2455                'accessConfigs': [{
2456                    'type': 'ONE_TO_ONE_NAT'
2457                }],
2458                'network': network
2459            }],
2460            'disks': [{
2461                'boot': True,
2462                'initializeParams': {
2463                    'sourceImage': source_image
2464                },
2465                'autoDelete': True
2466            }],
2467            'metadata': {
2468                'items': [{
2469                    'key': 'startup-script',
2470                    'value': startup_script
2471                }]
2472            }
2473        }
2474    }
2475
2476    logger.debug('Sending GCP request with body=%s', config)
2477    result = gcp.compute.instanceTemplates().insert(
2478        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2479    wait_for_global_operation(gcp, result['name'])
2480    gcp.instance_template = GcpResource(config['name'], result['targetLink'])
2481
2482
2483def add_instance_group(gcp, zone, name, size):
2484    config = {
2485        'name': name,
2486        'instanceTemplate': gcp.instance_template.url,
2487        'targetSize': size,
2488        'namedPorts': [{
2489            'name': 'grpc',
2490            'port': gcp.service_port
2491        }]
2492    }
2493
2494    logger.debug('Sending GCP request with body=%s', config)
2495    result = gcp.compute.instanceGroupManagers().insert(
2496        project=gcp.project, zone=zone,
2497        body=config).execute(num_retries=_GCP_API_RETRIES)
2498    wait_for_zone_operation(gcp, zone, result['name'])
2499    result = gcp.compute.instanceGroupManagers().get(
2500        project=gcp.project, zone=zone,
2501        instanceGroupManager=config['name']).execute(
2502            num_retries=_GCP_API_RETRIES)
2503    instance_group = InstanceGroup(config['name'], result['instanceGroup'],
2504                                   zone)
2505    gcp.instance_groups.append(instance_group)
2506    wait_for_instance_group_to_reach_expected_size(gcp, instance_group, size,
2507                                                   _WAIT_FOR_OPERATION_SEC)
2508    return instance_group
2509
2510
2511def create_health_check(gcp, name):
2512    if gcp.alpha_compute:
2513        config = {
2514            'name': name,
2515            'type': 'GRPC',
2516            'grpcHealthCheck': {
2517                'portSpecification': 'USE_SERVING_PORT'
2518            }
2519        }
2520        compute_to_use = gcp.alpha_compute
2521    else:
2522        config = {
2523            'name': name,
2524            'type': 'TCP',
2525            'tcpHealthCheck': {
2526                'portName': 'grpc'
2527            }
2528        }
2529        compute_to_use = gcp.compute
2530    logger.debug('Sending GCP request with body=%s', config)
2531    result = compute_to_use.healthChecks().insert(
2532        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2533    wait_for_global_operation(gcp, result['name'])
2534    gcp.health_check = GcpResource(config['name'], result['targetLink'])
2535
2536
2537def create_health_check_firewall_rule(gcp, name):
2538    config = {
2539        'name': name,
2540        'direction': 'INGRESS',
2541        'allowed': [{
2542            'IPProtocol': 'tcp'
2543        }],
2544        'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'],
2545        'targetTags': ['allow-health-checks'],
2546    }
2547    logger.debug('Sending GCP request with body=%s', config)
2548    result = gcp.compute.firewalls().insert(
2549        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2550    wait_for_global_operation(gcp, result['name'])
2551    gcp.health_check_firewall_rule = GcpResource(config['name'],
2552                                                 result['targetLink'])
2553
2554
2555def add_backend_service(gcp, name):
2556    if gcp.alpha_compute:
2557        protocol = 'GRPC'
2558        compute_to_use = gcp.alpha_compute
2559    else:
2560        protocol = 'HTTP2'
2561        compute_to_use = gcp.compute
2562    config = {
2563        'name': name,
2564        'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
2565        'healthChecks': [gcp.health_check.url],
2566        'portName': 'grpc',
2567        'protocol': protocol
2568    }
2569    logger.debug('Sending GCP request with body=%s', config)
2570    result = compute_to_use.backendServices().insert(
2571        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2572    wait_for_global_operation(gcp, result['name'])
2573    backend_service = GcpResource(config['name'], result['targetLink'])
2574    gcp.backend_services.append(backend_service)
2575    return backend_service
2576
2577
2578def create_url_map(gcp, name, backend_service, host_name):
2579    config = {
2580        'name': name,
2581        'defaultService': backend_service.url,
2582        'pathMatchers': [{
2583            'name': _PATH_MATCHER_NAME,
2584            'defaultService': backend_service.url,
2585        }],
2586        'hostRules': [{
2587            'hosts': [host_name],
2588            'pathMatcher': _PATH_MATCHER_NAME
2589        }]
2590    }
2591    logger.debug('Sending GCP request with body=%s', config)
2592    result = gcp.compute.urlMaps().insert(
2593        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2594    wait_for_global_operation(gcp, result['name'])
2595    url_map = GcpResource(config['name'], result['targetLink'])
2596    gcp.url_maps.append(url_map)
2597    return url_map
2598
2599
2600def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name):
2601    config = {
2602        'hostRules': [{
2603            'hosts': ['%s:%d' % (host_name, gcp.service_port)],
2604            'pathMatcher': _PATH_MATCHER_NAME
2605        }]
2606    }
2607    logger.debug('Sending GCP request with body=%s', config)
2608    result = gcp.compute.urlMaps().patch(
2609        project=gcp.project, urlMap=name,
2610        body=config).execute(num_retries=_GCP_API_RETRIES)
2611    wait_for_global_operation(gcp, result['name'])
2612
2613
2614def create_target_proxy(gcp, name, validate_for_proxyless=True, url_map=None):
2615    if url_map:
2616        arg_url_map_url = url_map.url
2617    else:
2618        arg_url_map_url = gcp.url_maps[0].url
2619    if gcp.alpha_compute:
2620        config = {
2621            'name': name,
2622            'url_map': arg_url_map_url,
2623            'validate_for_proxyless': validate_for_proxyless
2624        }
2625        logger.debug('Sending GCP request with body=%s', config)
2626        result = gcp.alpha_compute.targetGrpcProxies().insert(
2627            project=gcp.project,
2628            body=config).execute(num_retries=_GCP_API_RETRIES)
2629    else:
2630        config = {
2631            'name': name,
2632            'url_map': arg_url_map_url,
2633        }
2634        logger.debug('Sending GCP request with body=%s', config)
2635        result = gcp.compute.targetHttpProxies().insert(
2636            project=gcp.project,
2637            body=config).execute(num_retries=_GCP_API_RETRIES)
2638    wait_for_global_operation(gcp, result['name'])
2639    target_proxy = GcpResource(config['name'], result['targetLink'])
2640    gcp.target_proxies.append(target_proxy)
2641    return target_proxy
2642
2643
2644def create_global_forwarding_rule(gcp,
2645                                  name,
2646                                  potential_ports,
2647                                  potential_ip_addresses=['0.0.0.0'],
2648                                  target_proxy=None):
2649    if target_proxy:
2650        arg_target_proxy_url = target_proxy.url
2651    else:
2652        arg_target_proxy_url = gcp.target_proxies[0].url
2653    if gcp.alpha_compute:
2654        compute_to_use = gcp.alpha_compute
2655    else:
2656        compute_to_use = gcp.compute
2657    for port in potential_ports:
2658        for ip_address in potential_ip_addresses:
2659            try:
2660                config = {
2661                    'name': name,
2662                    'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
2663                    'portRange': str(port),
2664                    'IPAddress': ip_address,
2665                    'network': args.network,
2666                    'target': arg_target_proxy_url,
2667                }
2668                logger.debug('Sending GCP request with body=%s', config)
2669                result = compute_to_use.globalForwardingRules().insert(
2670                    project=gcp.project,
2671                    body=config).execute(num_retries=_GCP_API_RETRIES)
2672                wait_for_global_operation(gcp, result['name'])
2673                global_forwarding_rule = GcpResource(config['name'],
2674                                                     result['targetLink'])
2675                gcp.global_forwarding_rules.append(global_forwarding_rule)
2676                gcp.service_port = port
2677                return
2678            except googleapiclient.errors.HttpError as http_error:
2679                logger.warning(
2680                    'Got error %s when attempting to create forwarding rule to '
2681                    '%s:%d. Retrying with another port.' %
2682                    (http_error, ip_address, port))
2683
2684
2685def get_health_check(gcp, health_check_name):
2686    try:
2687        result = gcp.compute.healthChecks().get(
2688            project=gcp.project, healthCheck=health_check_name).execute()
2689        gcp.health_check = GcpResource(health_check_name, result['selfLink'])
2690    except Exception as e:
2691        gcp.errors.append(e)
2692        gcp.health_check = GcpResource(health_check_name, None)
2693
2694
2695def get_health_check_firewall_rule(gcp, firewall_name):
2696    try:
2697        result = gcp.compute.firewalls().get(project=gcp.project,
2698                                             firewall=firewall_name).execute()
2699        gcp.health_check_firewall_rule = GcpResource(firewall_name,
2700                                                     result['selfLink'])
2701    except Exception as e:
2702        gcp.errors.append(e)
2703        gcp.health_check_firewall_rule = GcpResource(firewall_name, None)
2704
2705
2706def get_backend_service(gcp, backend_service_name, record_error=True):
2707    try:
2708        result = gcp.compute.backendServices().get(
2709            project=gcp.project, backendService=backend_service_name).execute()
2710        backend_service = GcpResource(backend_service_name, result['selfLink'])
2711    except Exception as e:
2712        if record_error:
2713            gcp.errors.append(e)
2714        backend_service = GcpResource(backend_service_name, None)
2715    gcp.backend_services.append(backend_service)
2716    return backend_service
2717
2718
2719def get_url_map(gcp, url_map_name, record_error=True):
2720    try:
2721        result = gcp.compute.urlMaps().get(project=gcp.project,
2722                                           urlMap=url_map_name).execute()
2723        url_map = GcpResource(url_map_name, result['selfLink'])
2724        gcp.url_maps.append(url_map)
2725    except Exception as e:
2726        if record_error:
2727            gcp.errors.append(e)
2728
2729
2730def get_target_proxy(gcp, target_proxy_name, record_error=True):
2731    try:
2732        if gcp.alpha_compute:
2733            result = gcp.alpha_compute.targetGrpcProxies().get(
2734                project=gcp.project,
2735                targetGrpcProxy=target_proxy_name).execute()
2736        else:
2737            result = gcp.compute.targetHttpProxies().get(
2738                project=gcp.project,
2739                targetHttpProxy=target_proxy_name).execute()
2740        target_proxy = GcpResource(target_proxy_name, result['selfLink'])
2741        gcp.target_proxies.append(target_proxy)
2742    except Exception as e:
2743        if record_error:
2744            gcp.errors.append(e)
2745
2746
2747def get_global_forwarding_rule(gcp, forwarding_rule_name, record_error=True):
2748    try:
2749        result = gcp.compute.globalForwardingRules().get(
2750            project=gcp.project, forwardingRule=forwarding_rule_name).execute()
2751        global_forwarding_rule = GcpResource(forwarding_rule_name,
2752                                             result['selfLink'])
2753        gcp.global_forwarding_rules.append(global_forwarding_rule)
2754    except Exception as e:
2755        if record_error:
2756            gcp.errors.append(e)
2757
2758
2759def get_instance_template(gcp, template_name):
2760    try:
2761        result = gcp.compute.instanceTemplates().get(
2762            project=gcp.project, instanceTemplate=template_name).execute()
2763        gcp.instance_template = GcpResource(template_name, result['selfLink'])
2764    except Exception as e:
2765        gcp.errors.append(e)
2766        gcp.instance_template = GcpResource(template_name, None)
2767
2768
2769def get_instance_group(gcp, zone, instance_group_name):
2770    try:
2771        result = gcp.compute.instanceGroups().get(
2772            project=gcp.project, zone=zone,
2773            instanceGroup=instance_group_name).execute()
2774        gcp.service_port = result['namedPorts'][0]['port']
2775        instance_group = InstanceGroup(instance_group_name, result['selfLink'],
2776                                       zone)
2777    except Exception as e:
2778        gcp.errors.append(e)
2779        instance_group = InstanceGroup(instance_group_name, None, zone)
2780    gcp.instance_groups.append(instance_group)
2781    return instance_group
2782
2783
2784def delete_global_forwarding_rule(gcp, forwarding_rule_to_delete=None):
2785    if not forwarding_rule_to_delete:
2786        return
2787    try:
2788        logger.debug('Deleting forwarding rule %s',
2789                     forwarding_rule_to_delete.name)
2790        result = gcp.compute.globalForwardingRules().delete(
2791            project=gcp.project,
2792            forwardingRule=forwarding_rule_to_delete.name).execute(
2793                num_retries=_GCP_API_RETRIES)
2794        wait_for_global_operation(gcp, result['name'])
2795        if forwarding_rule_to_delete in gcp.global_forwarding_rules:
2796            gcp.global_forwarding_rules.remove(forwarding_rule_to_delete)
2797        else:
2798            logger.debug(
2799                'Forwarding rule %s does not exist in gcp.global_forwarding_rules',
2800                forwarding_rule_to_delete.name)
2801    except googleapiclient.errors.HttpError as http_error:
2802        logger.info('Delete failed: %s', http_error)
2803
2804
2805def delete_global_forwarding_rules(gcp):
2806    forwarding_rules_to_delete = gcp.global_forwarding_rules.copy()
2807    for forwarding_rule in forwarding_rules_to_delete:
2808        delete_global_forwarding_rule(gcp, forwarding_rule)
2809
2810
2811def delete_target_proxy(gcp, proxy_to_delete=None):
2812    if not proxy_to_delete:
2813        return
2814    try:
2815        if gcp.alpha_compute:
2816            logger.debug('Deleting grpc proxy %s', proxy_to_delete.name)
2817            result = gcp.alpha_compute.targetGrpcProxies().delete(
2818                project=gcp.project,
2819                targetGrpcProxy=proxy_to_delete.name).execute(
2820                    num_retries=_GCP_API_RETRIES)
2821        else:
2822            logger.debug('Deleting http proxy %s', proxy_to_delete.name)
2823            result = gcp.compute.targetHttpProxies().delete(
2824                project=gcp.project,
2825                targetHttpProxy=proxy_to_delete.name).execute(
2826                    num_retries=_GCP_API_RETRIES)
2827        wait_for_global_operation(gcp, result['name'])
2828        if proxy_to_delete in gcp.target_proxies:
2829            gcp.target_proxies.remove(proxy_to_delete)
2830        else:
2831            logger.debug('Gcp proxy %s does not exist in gcp.target_proxies',
2832                         proxy_to_delete.name)
2833    except googleapiclient.errors.HttpError as http_error:
2834        logger.info('Delete failed: %s', http_error)
2835
2836
2837def delete_target_proxies(gcp):
2838    target_proxies_to_delete = gcp.target_proxies.copy()
2839    for target_proxy in target_proxies_to_delete:
2840        delete_target_proxy(gcp, target_proxy)
2841
2842
2843def delete_url_map(gcp, url_map_to_delete=None):
2844    if not url_map_to_delete:
2845        return
2846    try:
2847        logger.debug('Deleting url map %s', url_map_to_delete.name)
2848        result = gcp.compute.urlMaps().delete(
2849            project=gcp.project,
2850            urlMap=url_map_to_delete.name).execute(num_retries=_GCP_API_RETRIES)
2851        wait_for_global_operation(gcp, result['name'])
2852        if url_map_to_delete in gcp.url_maps:
2853            gcp.url_maps.remove(url_map_to_delete)
2854        else:
2855            logger.debug('Url map %s does not exist in gcp.url_maps',
2856                         url_map_to_delete.name)
2857    except googleapiclient.errors.HttpError as http_error:
2858        logger.info('Delete failed: %s', http_error)
2859
2860
2861def delete_url_maps(gcp):
2862    url_maps_to_delete = gcp.url_maps.copy()
2863    for url_map in url_maps_to_delete:
2864        delete_url_map(gcp, url_map)
2865
2866
2867def delete_backend_service(gcp, backend_service):
2868    try:
2869        logger.debug('Deleting backend service %s', backend_service.name)
2870        result = gcp.compute.backendServices().delete(
2871            project=gcp.project, backendService=backend_service.name).execute(
2872                num_retries=_GCP_API_RETRIES)
2873        wait_for_global_operation(gcp, result['name'])
2874    except googleapiclient.errors.HttpError as http_error:
2875        logger.info('Delete failed: %s', http_error)
2876
2877
2878def delete_backend_services(gcp):
2879    for backend_service in gcp.backend_services:
2880        delete_backend_service(gcp, backend_service)
2881
2882
2883def delete_firewall(gcp):
2884    try:
2885        logger.debug('Deleting firewall %s',
2886                     gcp.health_check_firewall_rule.name)
2887        result = gcp.compute.firewalls().delete(
2888            project=gcp.project,
2889            firewall=gcp.health_check_firewall_rule.name).execute(
2890                num_retries=_GCP_API_RETRIES)
2891        wait_for_global_operation(gcp, result['name'])
2892    except googleapiclient.errors.HttpError as http_error:
2893        logger.info('Delete failed: %s', http_error)
2894
2895
2896def delete_health_check(gcp):
2897    try:
2898        logger.debug('Deleting health check %s', gcp.health_check.name)
2899        result = gcp.compute.healthChecks().delete(
2900            project=gcp.project, healthCheck=gcp.health_check.name).execute(
2901                num_retries=_GCP_API_RETRIES)
2902        wait_for_global_operation(gcp, result['name'])
2903    except googleapiclient.errors.HttpError as http_error:
2904        logger.info('Delete failed: %s', http_error)
2905
2906
2907def delete_instance_groups(gcp):
2908    for instance_group in gcp.instance_groups:
2909        try:
2910            logger.debug('Deleting instance group %s %s', instance_group.name,
2911                         instance_group.zone)
2912            result = gcp.compute.instanceGroupManagers().delete(
2913                project=gcp.project,
2914                zone=instance_group.zone,
2915                instanceGroupManager=instance_group.name).execute(
2916                    num_retries=_GCP_API_RETRIES)
2917            wait_for_zone_operation(gcp,
2918                                    instance_group.zone,
2919                                    result['name'],
2920                                    timeout_sec=_WAIT_FOR_BACKEND_SEC)
2921        except googleapiclient.errors.HttpError as http_error:
2922            logger.info('Delete failed: %s', http_error)
2923
2924
2925def delete_instance_template(gcp):
2926    try:
2927        logger.debug('Deleting instance template %s',
2928                     gcp.instance_template.name)
2929        result = gcp.compute.instanceTemplates().delete(
2930            project=gcp.project,
2931            instanceTemplate=gcp.instance_template.name).execute(
2932                num_retries=_GCP_API_RETRIES)
2933        wait_for_global_operation(gcp, result['name'])
2934    except googleapiclient.errors.HttpError as http_error:
2935        logger.info('Delete failed: %s', http_error)
2936
2937
2938def patch_backend_service(gcp,
2939                          backend_service,
2940                          instance_groups,
2941                          balancing_mode='UTILIZATION',
2942                          max_rate=1,
2943                          circuit_breakers=None):
2944    if gcp.alpha_compute:
2945        compute_to_use = gcp.alpha_compute
2946    else:
2947        compute_to_use = gcp.compute
2948    config = {
2949        'backends': [{
2950            'group': instance_group.url,
2951            'balancingMode': balancing_mode,
2952            'maxRate': max_rate if balancing_mode == 'RATE' else None
2953        } for instance_group in instance_groups],
2954        'circuitBreakers': circuit_breakers,
2955    }
2956    logger.debug('Sending GCP request with body=%s', config)
2957    result = compute_to_use.backendServices().patch(
2958        project=gcp.project, backendService=backend_service.name,
2959        body=config).execute(num_retries=_GCP_API_RETRIES)
2960    wait_for_global_operation(gcp,
2961                              result['name'],
2962                              timeout_sec=_WAIT_FOR_BACKEND_SEC)
2963
2964
2965def resize_instance_group(gcp,
2966                          instance_group,
2967                          new_size,
2968                          timeout_sec=_WAIT_FOR_OPERATION_SEC):
2969    result = gcp.compute.instanceGroupManagers().resize(
2970        project=gcp.project,
2971        zone=instance_group.zone,
2972        instanceGroupManager=instance_group.name,
2973        size=new_size).execute(num_retries=_GCP_API_RETRIES)
2974    wait_for_zone_operation(gcp,
2975                            instance_group.zone,
2976                            result['name'],
2977                            timeout_sec=360)
2978    wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
2979                                                   new_size, timeout_sec)
2980
2981
2982def patch_url_map_backend_service(gcp,
2983                                  backend_service=None,
2984                                  services_with_weights=None,
2985                                  route_rules=None,
2986                                  url_map=None):
2987    if url_map:
2988        url_map_name = url_map.name
2989    else:
2990        url_map_name = gcp.url_maps[0].name
2991    '''change url_map's backend service
2992
2993    Only one of backend_service and service_with_weights can be not None.
2994    '''
2995    if gcp.alpha_compute:
2996        compute_to_use = gcp.alpha_compute
2997    else:
2998        compute_to_use = gcp.compute
2999
3000    if backend_service and services_with_weights:
3001        raise ValueError(
3002            'both backend_service and service_with_weights are not None.')
3003
3004    default_service = backend_service.url if backend_service else None
3005    default_route_action = {
3006        'weightedBackendServices': [{
3007            'backendService': service.url,
3008            'weight': w,
3009        } for service, w in services_with_weights.items()]
3010    } if services_with_weights else None
3011
3012    config = {
3013        'pathMatchers': [{
3014            'name': _PATH_MATCHER_NAME,
3015            'defaultService': default_service,
3016            'defaultRouteAction': default_route_action,
3017            'routeRules': route_rules,
3018        }]
3019    }
3020    logger.debug('Sending GCP request with body=%s', config)
3021    result = compute_to_use.urlMaps().patch(
3022        project=gcp.project, urlMap=url_map_name,
3023        body=config).execute(num_retries=_GCP_API_RETRIES)
3024    wait_for_global_operation(gcp, result['name'])
3025
3026
3027def wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
3028                                                   expected_size, timeout_sec):
3029    start_time = time.time()
3030    while True:
3031        current_size = len(get_instance_names(gcp, instance_group))
3032        if current_size == expected_size:
3033            break
3034        if time.time() - start_time > timeout_sec:
3035            raise Exception(
3036                'Instance group had expected size %d but actual size %d' %
3037                (expected_size, current_size))
3038        time.sleep(2)
3039
3040
3041def wait_for_global_operation(gcp,
3042                              operation,
3043                              timeout_sec=_WAIT_FOR_OPERATION_SEC):
3044    start_time = time.time()
3045    while time.time() - start_time <= timeout_sec:
3046        result = gcp.compute.globalOperations().get(
3047            project=gcp.project,
3048            operation=operation).execute(num_retries=_GCP_API_RETRIES)
3049        if result['status'] == 'DONE':
3050            if 'error' in result:
3051                raise Exception(result['error'])
3052            return
3053        time.sleep(2)
3054    raise Exception('Operation %s did not complete within %d' %
3055                    (operation, timeout_sec))
3056
3057
3058def wait_for_zone_operation(gcp,
3059                            zone,
3060                            operation,
3061                            timeout_sec=_WAIT_FOR_OPERATION_SEC):
3062    start_time = time.time()
3063    while time.time() - start_time <= timeout_sec:
3064        result = gcp.compute.zoneOperations().get(
3065            project=gcp.project, zone=zone,
3066            operation=operation).execute(num_retries=_GCP_API_RETRIES)
3067        if result['status'] == 'DONE':
3068            if 'error' in result:
3069                raise Exception(result['error'])
3070            return
3071        time.sleep(2)
3072    raise Exception('Operation %s did not complete within %d' %
3073                    (operation, timeout_sec))
3074
3075
3076def wait_for_healthy_backends(gcp,
3077                              backend_service,
3078                              instance_group,
3079                              timeout_sec=_WAIT_FOR_BACKEND_SEC):
3080    start_time = time.time()
3081    config = {'group': instance_group.url}
3082    instance_names = get_instance_names(gcp, instance_group)
3083    expected_size = len(instance_names)
3084    while time.time() - start_time <= timeout_sec:
3085        for instance_name in instance_names:
3086            try:
3087                status = get_serving_status(instance_name, gcp.service_port)
3088                logger.info('serving status response from %s: %s',
3089                            instance_name, status)
3090            except grpc.RpcError as rpc_error:
3091                logger.info('checking serving status of %s failed: %s',
3092                            instance_name, rpc_error)
3093        result = gcp.compute.backendServices().getHealth(
3094            project=gcp.project,
3095            backendService=backend_service.name,
3096            body=config).execute(num_retries=_GCP_API_RETRIES)
3097        if 'healthStatus' in result:
3098            logger.info('received GCP healthStatus: %s', result['healthStatus'])
3099            healthy = True
3100            for instance in result['healthStatus']:
3101                if instance['healthState'] != 'HEALTHY':
3102                    healthy = False
3103                    break
3104            if healthy and expected_size == len(result['healthStatus']):
3105                return
3106        else:
3107            logger.info('no healthStatus received from GCP')
3108        time.sleep(5)
3109    raise Exception('Not all backends became healthy within %d seconds: %s' %
3110                    (timeout_sec, result))
3111
3112
3113def get_instance_names(gcp, instance_group):
3114    instance_names = []
3115    result = gcp.compute.instanceGroups().listInstances(
3116        project=gcp.project,
3117        zone=instance_group.zone,
3118        instanceGroup=instance_group.name,
3119        body={
3120            'instanceState': 'ALL'
3121        }).execute(num_retries=_GCP_API_RETRIES)
3122    if 'items' not in result:
3123        return []
3124    for item in result['items']:
3125        # listInstances() returns the full URL of the instance, which ends with
3126        # the instance name. compute.instances().get() requires using the
3127        # instance name (not the full URL) to look up instance details, so we
3128        # just extract the name manually.
3129        instance_name = item['instance'].split('/')[-1]
3130        instance_names.append(instance_name)
3131    logger.info('retrieved instance names: %s', instance_names)
3132    return instance_names
3133
3134
3135def clean_up(gcp):
3136    delete_global_forwarding_rules(gcp)
3137    delete_target_proxies(gcp)
3138    delete_url_maps(gcp)
3139    delete_backend_services(gcp)
3140    if gcp.health_check_firewall_rule:
3141        delete_firewall(gcp)
3142    if gcp.health_check:
3143        delete_health_check(gcp)
3144    delete_instance_groups(gcp)
3145    if gcp.instance_template:
3146        delete_instance_template(gcp)
3147
3148
3149class InstanceGroup(object):
3150
3151    def __init__(self, name, url, zone):
3152        self.name = name
3153        self.url = url
3154        self.zone = zone
3155
3156
3157class GcpResource(object):
3158
3159    def __init__(self, name, url):
3160        self.name = name
3161        self.url = url
3162
3163
3164class GcpState(object):
3165
3166    def __init__(self, compute, alpha_compute, project, project_num):
3167        self.compute = compute
3168        self.alpha_compute = alpha_compute
3169        self.project = project
3170        self.project_num = project_num
3171        self.health_check = None
3172        self.health_check_firewall_rule = None
3173        self.backend_services = []
3174        self.url_maps = []
3175        self.target_proxies = []
3176        self.global_forwarding_rules = []
3177        self.service_port = None
3178        self.instance_template = None
3179        self.instance_groups = []
3180        self.errors = []
3181
3182
3183logging.debug(
3184    "script start time: %s",
3185    datetime.datetime.now(
3186        datetime.timezone.utc).astimezone().strftime("%Y-%m-%dT%H:%M:%S %Z"))
3187logging.debug("logging local timezone: %s",
3188              datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo)
3189alpha_compute = None
3190if args.compute_discovery_document:
3191    with open(args.compute_discovery_document, 'r') as discovery_doc:
3192        compute = googleapiclient.discovery.build_from_document(
3193            discovery_doc.read())
3194    if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document:
3195        with open(args.alpha_compute_discovery_document, 'r') as discovery_doc:
3196            alpha_compute = googleapiclient.discovery.build_from_document(
3197                discovery_doc.read())
3198else:
3199    compute = googleapiclient.discovery.build('compute', 'v1')
3200    if not args.only_stable_gcp_apis:
3201        alpha_compute = googleapiclient.discovery.build('compute', 'alpha')
3202
3203test_results = {}
3204failed_tests = []
3205try:
3206    gcp = GcpState(compute, alpha_compute, args.project_id, args.project_num)
3207    gcp_suffix = args.gcp_suffix
3208    health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
3209    if not args.use_existing_gcp_resources:
3210        if args.keep_gcp_resources:
3211            # Auto-generating a unique suffix in case of conflict should not be
3212            # combined with --keep_gcp_resources, as the suffix actually used
3213            # for GCP resources will not match the provided --gcp_suffix value.
3214            num_attempts = 1
3215        else:
3216            num_attempts = 5
3217        for i in range(num_attempts):
3218            try:
3219                logger.info('Using GCP suffix %s', gcp_suffix)
3220                create_health_check(gcp, health_check_name)
3221                break
3222            except googleapiclient.errors.HttpError as http_error:
3223                gcp_suffix = '%s-%04d' % (gcp_suffix, random.randint(0, 9999))
3224                health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
3225                logger.exception('HttpError when creating health check')
3226        if gcp.health_check is None:
3227            raise Exception('Failed to create health check name after %d '
3228                            'attempts' % num_attempts)
3229    firewall_name = _BASE_FIREWALL_RULE_NAME + gcp_suffix
3230    backend_service_name = _BASE_BACKEND_SERVICE_NAME + gcp_suffix
3231    alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + gcp_suffix
3232    extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix
3233    more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix
3234    url_map_name = _BASE_URL_MAP_NAME + gcp_suffix
3235    url_map_name_2 = url_map_name + '2'
3236    service_host_name = _BASE_SERVICE_HOST + gcp_suffix
3237    target_proxy_name = _BASE_TARGET_PROXY_NAME + gcp_suffix
3238    target_proxy_name_2 = target_proxy_name + '2'
3239    forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + gcp_suffix
3240    forwarding_rule_name_2 = forwarding_rule_name + '2'
3241    template_name = _BASE_TEMPLATE_NAME + gcp_suffix
3242    instance_group_name = _BASE_INSTANCE_GROUP_NAME + gcp_suffix
3243    same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + gcp_suffix
3244    secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + gcp_suffix
3245    potential_service_ports = list(args.service_port_range)
3246    random.shuffle(potential_service_ports)
3247    if args.use_existing_gcp_resources:
3248        logger.info('Reusing existing GCP resources')
3249        get_health_check(gcp, health_check_name)
3250        get_health_check_firewall_rule(gcp, firewall_name)
3251        backend_service = get_backend_service(gcp, backend_service_name)
3252        alternate_backend_service = get_backend_service(
3253            gcp, alternate_backend_service_name)
3254        extra_backend_service = get_backend_service(gcp,
3255                                                    extra_backend_service_name,
3256                                                    record_error=False)
3257        more_extra_backend_service = get_backend_service(
3258            gcp, more_extra_backend_service_name, record_error=False)
3259        get_url_map(gcp, url_map_name)
3260        get_target_proxy(gcp, target_proxy_name)
3261        get_global_forwarding_rule(gcp, forwarding_rule_name)
3262        get_url_map(gcp, url_map_name_2, record_error=False)
3263        get_target_proxy(gcp, target_proxy_name_2, record_error=False)
3264        get_global_forwarding_rule(gcp,
3265                                   forwarding_rule_name_2,
3266                                   record_error=False)
3267        get_instance_template(gcp, template_name)
3268        instance_group = get_instance_group(gcp, args.zone, instance_group_name)
3269        same_zone_instance_group = get_instance_group(
3270            gcp, args.zone, same_zone_instance_group_name)
3271        secondary_zone_instance_group = get_instance_group(
3272            gcp, args.secondary_zone, secondary_zone_instance_group_name)
3273        if gcp.errors:
3274            raise Exception(gcp.errors)
3275    else:
3276        create_health_check_firewall_rule(gcp, firewall_name)
3277        backend_service = add_backend_service(gcp, backend_service_name)
3278        alternate_backend_service = add_backend_service(
3279            gcp, alternate_backend_service_name)
3280        create_url_map(gcp, url_map_name, backend_service, service_host_name)
3281        create_target_proxy(gcp, target_proxy_name)
3282        create_global_forwarding_rule(gcp, forwarding_rule_name,
3283                                      potential_service_ports)
3284        if not gcp.service_port:
3285            raise Exception(
3286                'Failed to find a valid ip:port for the forwarding rule')
3287        if gcp.service_port != _DEFAULT_SERVICE_PORT:
3288            patch_url_map_host_rule_with_port(gcp, url_map_name,
3289                                              backend_service,
3290                                              service_host_name)
3291        startup_script = get_startup_script(args.path_to_server_binary,
3292                                            gcp.service_port)
3293        create_instance_template(gcp, template_name, args.network,
3294                                 args.source_image, args.machine_type,
3295                                 startup_script)
3296        instance_group = add_instance_group(gcp, args.zone, instance_group_name,
3297                                            _INSTANCE_GROUP_SIZE)
3298        patch_backend_service(gcp, backend_service, [instance_group])
3299        same_zone_instance_group = add_instance_group(
3300            gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE)
3301        secondary_zone_instance_group = add_instance_group(
3302            gcp, args.secondary_zone, secondary_zone_instance_group_name,
3303            _INSTANCE_GROUP_SIZE)
3304
3305    wait_for_healthy_backends(gcp, backend_service, instance_group)
3306
3307    if args.test_case:
3308        client_env = dict(os.environ)
3309        if original_grpc_trace:
3310            client_env['GRPC_TRACE'] = original_grpc_trace
3311        if original_grpc_verbosity:
3312            client_env['GRPC_VERBOSITY'] = original_grpc_verbosity
3313        bootstrap_server_features = []
3314
3315        if gcp.service_port == _DEFAULT_SERVICE_PORT:
3316            server_uri = service_host_name
3317        else:
3318            server_uri = service_host_name + ':' + str(gcp.service_port)
3319        if args.xds_v3_support:
3320            client_env['GRPC_XDS_EXPERIMENTAL_V3_SUPPORT'] = 'true'
3321            bootstrap_server_features.append('xds_v3')
3322        if args.bootstrap_file:
3323            bootstrap_path = os.path.abspath(args.bootstrap_file)
3324        else:
3325            with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file:
3326                bootstrap_file.write(
3327                    _BOOTSTRAP_TEMPLATE.format(
3328                        node_id='projects/%s/networks/%s/nodes/%s' %
3329                        (gcp.project_num, args.network.split('/')[-1],
3330                         uuid.uuid1()),
3331                        server_features=json.dumps(
3332                            bootstrap_server_features)).encode('utf-8'))
3333                bootstrap_path = bootstrap_file.name
3334        client_env['GRPC_XDS_BOOTSTRAP'] = bootstrap_path
3335        client_env['GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING'] = 'true'
3336        client_env['GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT'] = 'true'
3337        client_env['GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION'] = 'true'
3338        for test_case in args.test_case:
3339            if test_case in _V3_TEST_CASES and not args.xds_v3_support:
3340                logger.info('skipping test %s due to missing v3 support',
3341                            test_case)
3342                continue
3343            if test_case in _ALPHA_TEST_CASES and not gcp.alpha_compute:
3344                logger.info('skipping test %s due to missing alpha support',
3345                            test_case)
3346                continue
3347            if test_case in [
3348                    'api_listener', 'forwarding_rule_port_match',
3349                    'forwarding_rule_default_port'
3350            ] and CLIENT_HOSTS:
3351                logger.info(
3352                    'skipping test %s because test configuration is'
3353                    'not compatible with client processes on existing'
3354                    'client hosts', test_case)
3355                continue
3356            if test_case == 'forwarding_rule_default_port':
3357                server_uri = service_host_name
3358            result = jobset.JobResult()
3359            log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case)
3360            if not os.path.exists(log_dir):
3361                os.makedirs(log_dir)
3362            test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME)
3363            test_log_file = open(test_log_filename, 'w+')
3364            client_process = None
3365
3366            if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS:
3367                rpcs_to_send = '--rpc="UnaryCall,EmptyCall"'
3368            else:
3369                rpcs_to_send = '--rpc="UnaryCall"'
3370
3371            if test_case in _TESTS_TO_SEND_METADATA:
3372                metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU},UnaryCall:{keyNU}:{valueNU}"'.format(
3373                    keyE=_TEST_METADATA_KEY,
3374                    valueE=_TEST_METADATA_VALUE_EMPTY,
3375                    keyU=_TEST_METADATA_KEY,
3376                    valueU=_TEST_METADATA_VALUE_UNARY,
3377                    keyNU=_TEST_METADATA_NUMERIC_KEY,
3378                    valueNU=_TEST_METADATA_NUMERIC_VALUE)
3379            else:
3380                # Setting the arg explicitly to empty with '--metadata=""'
3381                # makes C# client fail
3382                # (see https://github.com/commandlineparser/commandline/issues/412),
3383                # so instead we just rely on clients using the default when
3384                # metadata arg is not specified.
3385                metadata_to_send = ''
3386
3387            # TODO(ericgribkoff) Temporarily disable fail_on_failed_rpc checks
3388            # in the client. This means we will ignore intermittent RPC
3389            # failures (but this framework still checks that the final result
3390            # is as expected).
3391            #
3392            # Reason for disabling this is, the resources are shared by
3393            # multiple tests, and a change in previous test could be delayed
3394            # until the second test starts. The second test may see
3395            # intermittent failures because of that.
3396            #
3397            # A fix is to not share resources between tests (though that does
3398            # mean the tests will be significantly slower due to creating new
3399            # resources).
3400            fail_on_failed_rpc = ''
3401
3402            try:
3403                if not CLIENT_HOSTS:
3404                    client_cmd_formatted = args.client_cmd.format(
3405                        server_uri=server_uri,
3406                        stats_port=args.stats_port,
3407                        qps=args.qps,
3408                        fail_on_failed_rpc=fail_on_failed_rpc,
3409                        rpcs_to_send=rpcs_to_send,
3410                        metadata_to_send=metadata_to_send)
3411                    logger.debug('running client: %s', client_cmd_formatted)
3412                    client_cmd = shlex.split(client_cmd_formatted)
3413                    client_process = subprocess.Popen(client_cmd,
3414                                                      env=client_env,
3415                                                      stderr=subprocess.STDOUT,
3416                                                      stdout=test_log_file)
3417                if test_case == 'backends_restart':
3418                    test_backends_restart(gcp, backend_service, instance_group)
3419                elif test_case == 'change_backend_service':
3420                    test_change_backend_service(gcp, backend_service,
3421                                                instance_group,
3422                                                alternate_backend_service,
3423                                                same_zone_instance_group)
3424                elif test_case == 'gentle_failover':
3425                    test_gentle_failover(gcp, backend_service, instance_group,
3426                                         secondary_zone_instance_group)
3427                elif test_case == 'load_report_based_failover':
3428                    test_load_report_based_failover(
3429                        gcp, backend_service, instance_group,
3430                        secondary_zone_instance_group)
3431                elif test_case == 'ping_pong':
3432                    test_ping_pong(gcp, backend_service, instance_group)
3433                elif test_case == 'remove_instance_group':
3434                    test_remove_instance_group(gcp, backend_service,
3435                                               instance_group,
3436                                               same_zone_instance_group)
3437                elif test_case == 'round_robin':
3438                    test_round_robin(gcp, backend_service, instance_group)
3439                elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure':
3440                    test_secondary_locality_gets_no_requests_on_partial_primary_failure(
3441                        gcp, backend_service, instance_group,
3442                        secondary_zone_instance_group)
3443                elif test_case == 'secondary_locality_gets_requests_on_primary_failure':
3444                    test_secondary_locality_gets_requests_on_primary_failure(
3445                        gcp, backend_service, instance_group,
3446                        secondary_zone_instance_group)
3447                elif test_case == 'traffic_splitting':
3448                    test_traffic_splitting(gcp, backend_service, instance_group,
3449                                           alternate_backend_service,
3450                                           same_zone_instance_group)
3451                elif test_case == 'path_matching':
3452                    test_path_matching(gcp, backend_service, instance_group,
3453                                       alternate_backend_service,
3454                                       same_zone_instance_group)
3455                elif test_case == 'header_matching':
3456                    test_header_matching(gcp, backend_service, instance_group,
3457                                         alternate_backend_service,
3458                                         same_zone_instance_group)
3459                elif test_case == 'circuit_breaking':
3460                    test_circuit_breaking(gcp, backend_service, instance_group,
3461                                          same_zone_instance_group)
3462                elif test_case == 'timeout':
3463                    test_timeout(gcp, backend_service, instance_group)
3464                elif test_case == 'fault_injection':
3465                    test_fault_injection(gcp, backend_service, instance_group)
3466                elif test_case == 'api_listener':
3467                    server_uri = test_api_listener(gcp, backend_service,
3468                                                   instance_group,
3469                                                   alternate_backend_service)
3470                elif test_case == 'forwarding_rule_port_match':
3471                    server_uri = test_forwarding_rule_port_match(
3472                        gcp, backend_service, instance_group)
3473                elif test_case == 'forwarding_rule_default_port':
3474                    server_uri = test_forwarding_rule_default_port(
3475                        gcp, backend_service, instance_group)
3476                elif test_case == 'metadata_filter':
3477                    test_metadata_filter(gcp, backend_service, instance_group,
3478                                         alternate_backend_service,
3479                                         same_zone_instance_group)
3480                elif test_case == 'csds':
3481                    test_csds(gcp, backend_service, instance_group, server_uri)
3482                else:
3483                    logger.error('Unknown test case: %s', test_case)
3484                    sys.exit(1)
3485                if client_process and client_process.poll() is not None:
3486                    raise Exception(
3487                        'Client process exited prematurely with exit code %d' %
3488                        client_process.returncode)
3489                result.state = 'PASSED'
3490                result.returncode = 0
3491            except Exception as e:
3492                logger.exception('Test case %s failed', test_case)
3493                failed_tests.append(test_case)
3494                result.state = 'FAILED'
3495                result.message = str(e)
3496                if args.halt_after_fail:
3497                    # Stop the test suite if one case failed.
3498                    raise
3499            finally:
3500                if client_process:
3501                    if client_process.returncode:
3502                        logger.info('Client exited with code %d' %
3503                                    client_process.returncode)
3504                    else:
3505                        client_process.terminate()
3506                test_log_file.close()
3507                # Workaround for Python 3, as report_utils will invoke decode() on
3508                # result.message, which has a default value of ''.
3509                result.message = result.message.encode('UTF-8')
3510                test_results[test_case] = [result]
3511                if args.log_client_output:
3512                    logger.info('Client output:')
3513                    with open(test_log_filename, 'r') as client_output:
3514                        logger.info(client_output.read())
3515        if not os.path.exists(_TEST_LOG_BASE_DIR):
3516            os.makedirs(_TEST_LOG_BASE_DIR)
3517        report_utils.render_junit_xml_report(test_results,
3518                                             os.path.join(
3519                                                 _TEST_LOG_BASE_DIR,
3520                                                 _SPONGE_XML_NAME),
3521                                             suite_name='xds_tests',
3522                                             multi_target=True)
3523        if failed_tests:
3524            logger.error('Test case(s) %s failed', failed_tests)
3525            sys.exit(1)
3526finally:
3527    keep_resources = args.keep_gcp_resources
3528    if args.halt_after_fail and failed_tests:
3529        logger.info(
3530            'Halt after fail triggered, exiting without cleaning up resources')
3531        keep_resources = True
3532    if not keep_resources:
3533        logger.info('Cleaning up GCP resources. This may take some time.')
3534        clean_up(gcp)
3535