1# -*- coding: utf-8 -*- 2# Copyright 2015 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# pylint:mode=test 16"""Unit tests for analytics data collection.""" 17 18from __future__ import absolute_import 19 20import logging 21import os 22import pickle 23import re 24import socket 25import subprocess 26import sys 27import tempfile 28 29from apitools.base.py import exceptions as apitools_exceptions 30from apitools.base.py import http_wrapper 31from boto.storage_uri import BucketStorageUri 32 33from gslib import metrics 34from gslib import VERSION 35from gslib.cs_api_map import ApiSelector 36import gslib.exception 37from gslib.gcs_json_api import GcsJsonApi 38from gslib.metrics import MetricsCollector 39from gslib.metrics_reporter import LOG_FILE_PATH 40from gslib.metrics_tuple import Metric 41from gslib.tests.mock_logging_handler import MockLoggingHandler 42import gslib.tests.testcase as testcase 43from gslib.tests.testcase.integration_testcase import SkipForS3 44from gslib.tests.util import HAS_S3_CREDS 45from gslib.tests.util import ObjectToURI as suri 46from gslib.tests.util import SetBotoConfigForTest 47from gslib.tests.util import unittest 48from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages 49from gslib.thread_message import FileMessage 50from gslib.thread_message import RetryableErrorMessage 51from gslib.util import IS_LINUX 52from gslib.util import IS_WINDOWS 53from gslib.util import LogAndHandleRetries 54from gslib.util import ONE_KIB 55from gslib.util import ONE_MIB 56from gslib.util import START_CALLBACK_PER_BYTES 57import mock 58 59# A piece of the URL logged for all of the tests. 60GLOBAL_DIMENSIONS_URL_PARAMS = ( 61 'a=b&c=d&cd1=cmd1+action1&cd10=0&cd2=x%2Cy%2Cz&cd3=opta%2Coptb&' 62 'cd6=CommandException&cm1=0') 63 64GLOBAL_PARAMETERS = ['a=b', 'c=d', 'cd1=cmd1 action1', 'cd2=x,y,z', 65 'cd3=opta,optb', 'cd6=CommandException', 'cm1=0', 66 'ev=0', 'el={0}'.format(VERSION)] 67COMMAND_AND_ERROR_TEST_METRICS = set([ 68 Metric( 69 'https://example.com', 'POST', 70 '{0}&cm2=3&ea=cmd1+action1&ec={1}&el={2}&ev=0'.format( 71 GLOBAL_DIMENSIONS_URL_PARAMS, metrics._GA_COMMANDS_CATEGORY, 72 VERSION), 73 'user-agent-007'), 74 Metric( 75 'https://example.com', 'POST', 76 '{0}&cm2=2&ea=Exception&ec={1}&el={2}&ev=0'.format( 77 GLOBAL_DIMENSIONS_URL_PARAMS, metrics._GA_ERRORRETRY_CATEGORY, 78 VERSION), 79 'user-agent-007'), 80 Metric( 81 'https://example.com', 'POST', 82 '{0}&cm2=1&ea=ValueError&ec={1}&el={2}&ev=0'.format( 83 GLOBAL_DIMENSIONS_URL_PARAMS, metrics._GA_ERRORRETRY_CATEGORY, 84 VERSION), 85 'user-agent-007'), 86 Metric( 87 'https://example.com', 'POST', 88 '{0}&ea=CommandException&ec={1}&el={2}&ev=0'.format( 89 GLOBAL_DIMENSIONS_URL_PARAMS, metrics._GA_ERRORFATAL_CATEGORY, 90 VERSION), 91 'user-agent-007') 92]) 93 94# A regex to find the list of metrics in log output. 95METRICS_LOG_RE = re.compile(r'(\[Metric.*\])') 96 97 98def _TryExceptAndPass(func, *args, **kwargs): 99 """Calls the given function with the arguments and ignores exceptions. 100 101 In these tests, we often force a failure that doesn't matter in order to 102 check that a metric was collected. 103 104 Args: 105 func: The function to call. 106 *args: Any arguments to call the function with. 107 **kwargs: Any named arguments to call the function with. 108 """ 109 try: 110 func(*args, **kwargs) 111 except: # pylint: disable=bare-except 112 pass 113 114 115def _LogAllTestMetrics(): 116 """Logs all the common metrics for a test.""" 117 metrics.LogCommandParams( 118 command_name='cmd1', subcommands=['action1'], 119 global_opts=[('-y', 'value'), ('-z', ''), ('-x', '')], 120 sub_opts=[('optb', ''), ('opta', '')]) 121 retry_msg_1 = RetryableErrorMessage(Exception(), 0) 122 retry_msg_2 = RetryableErrorMessage(ValueError(), 0) 123 metrics.LogRetryableError(retry_msg_1) 124 metrics.LogRetryableError(retry_msg_1) 125 metrics.LogRetryableError(retry_msg_2) 126 metrics.LogFatalError(gslib.exception.CommandException('test')) 127 128 129class RetryableErrorsQueue(object): 130 """Emulates Cloud API status queue, processes only RetryableErrorMessages.""" 131 132 def put(self, status_item): # pylint: disable=invalid-name 133 if isinstance(status_item, RetryableErrorMessage): 134 metrics.LogRetryableError(status_item) 135 136 137@mock.patch('time.time', new=mock.MagicMock(return_value=0)) 138class TestMetricsUnitTests(testcase.GsUtilUnitTestCase): 139 """Unit tests for analytics data collection.""" 140 141 def setUp(self): 142 super(TestMetricsUnitTests, self).setUp() 143 144 # Save the original state of the collector. 145 self.original_collector_instance = MetricsCollector.GetCollector() 146 147 # Set dummy attributes for the collector. 148 MetricsCollector.StartTestCollector('https://example.com', 'user-agent-007', 149 {'a': 'b', 'c': 'd'}) 150 self.collector = MetricsCollector.GetCollector() 151 152 self.log_handler = MockLoggingHandler() 153 # Use metrics logger to avoid impacting the root logger which may 154 # interfere with other tests. 155 logging.getLogger('metrics').setLevel(logging.DEBUG) 156 logging.getLogger('metrics').addHandler(self.log_handler) 157 158 def tearDown(self): 159 super(TestMetricsUnitTests, self).tearDown() 160 161 # Reset to default collection settings. 162 MetricsCollector.StopTestCollector( 163 original_instance=self.original_collector_instance) 164 165 def testDisabling(self): 166 """Tests enabling/disabling of metrics collection.""" 167 self.assertEqual(self.collector, MetricsCollector.GetCollector()) 168 169 # Test when gsutil is part of the Cloud SDK and the user opted in there. 170 with mock.patch.dict(os.environ, 171 values={'CLOUDSDK_WRAPPER': '1', 172 'GA_CID': '555'}): 173 MetricsCollector._CheckAndSetDisabledCache() 174 self.assertFalse(MetricsCollector._disabled_cache) 175 self.assertEqual(self.collector, MetricsCollector.GetCollector()) 176 177 # Test when gsutil is part of the Cloud SDK and the user did not opt in 178 # there. 179 with mock.patch.dict(os.environ, 180 values={'CLOUDSDK_WRAPPER': '1', 181 'GA_CID': ''}): 182 MetricsCollector._CheckAndSetDisabledCache() 183 self.assertTrue(MetricsCollector._disabled_cache) 184 self.assertEqual(None, MetricsCollector.GetCollector()) 185 186 # Test when gsutil is not part of the Cloud SDK and there is no UUID file. 187 with mock.patch.dict(os.environ, values={'CLOUDSDK_WRAPPER': ''}): 188 with mock.patch('os.path.exists', return_value=False): 189 MetricsCollector._CheckAndSetDisabledCache() 190 self.assertTrue(MetricsCollector._disabled_cache) 191 self.assertEqual(None, MetricsCollector.GetCollector()) 192 193 # Test when gsutil is not part of the Cloud SDK and there is a UUID file. 194 with mock.patch.dict(os.environ, values={'CLOUDSDK_WRAPPER': ''}): 195 with mock.patch('os.path.exists', return_value=True): 196 # Mock the contents of the file. 197 with mock.patch('__builtin__.open') as mock_open: 198 mock_open.return_value.__enter__ = lambda s: s 199 200 # Set the file.read() method to return the disabled text. 201 mock_open.return_value.read.return_value = metrics._DISABLED_TEXT 202 MetricsCollector._CheckAndSetDisabledCache() 203 self.assertTrue(MetricsCollector._disabled_cache) 204 self.assertEqual(None, MetricsCollector.GetCollector()) 205 206 # Set the file.read() method to return a mock cid (analytics enabled). 207 mock_open.return_value.read.return_value = 'mock_cid' 208 MetricsCollector._CheckAndSetDisabledCache() 209 self.assertFalse(MetricsCollector._disabled_cache) 210 self.assertEqual(self.collector, MetricsCollector.GetCollector()) 211 212 # Check that open/read was called twice. 213 self.assertEqual(2, len(mock_open.call_args_list)) 214 self.assertEqual(2, len(mock_open.return_value.read.call_args_list)) 215 216 def testConfigValueValidation(self): 217 """Tests the validation of potentially PII config values.""" 218 string_and_bool_categories = ['check_hashes', 'content_language', 219 'disable_analytics_prompt', 220 'https_validate_certificates', 221 'json_api_version', 222 'parallel_composite_upload_component_size', 223 'parallel_composite_upload_threshold', 224 'prefer_api', 225 'sliced_object_download_component_size', 226 'sliced_object_download_threshold', 227 'tab_completion_time_logs', 'token_cache', 228 'use_magicfile'] 229 int_categories = ['debug', 'default_api_version', 'http_socket_timeout', 230 'max_retry_delay', 'num_retries', 231 'oauth2_refresh_retries', 'parallel_process_count', 232 'parallel_thread_count', 'resumable_threshold', 233 'rsync_buffer_lines', 234 'sliced_object_download_max_components', 235 'software_update_check_period', 'tab_completion_timeout', 236 'task_estimation_threshold'] 237 all_categories = sorted(string_and_bool_categories + int_categories) 238 239 # Test general invalid values. 240 with mock.patch('boto.config.get_value', return_value=None): 241 self.assertEqual('', self.collector._ValidateAndGetConfigValues()) 242 243 with mock.patch('boto.config.get_value', return_value='invalid string'): 244 self.assertEqual(','.join([ 245 category + ':INVALID' for category in all_categories 246 ]), self.collector._ValidateAndGetConfigValues()) 247 248 # Test that non-ASCII characters are invalid. 249 with mock.patch('boto.config.get_value', return_value='£'): 250 self.assertEqual(','.join([ 251 category + ':INVALID' for category in all_categories 252 ]), self.collector._ValidateAndGetConfigValues()) 253 254 # Mock valid return values for specific string validations. 255 def MockValidStrings(section, category): 256 if section == 'GSUtil': 257 if category == 'check_hashes': 258 return 'if_fast_else_skip' 259 if category == 'content_language': 260 return 'chi' 261 if category == 'json_api_version': 262 return 'v3' 263 if category == 'prefer_api': 264 return 'xml' 265 if category in ('disable_analytics_prompt', 'use_magicfile', 266 'tab_completion_time_logs'): 267 return 'True' 268 if section == 'OAuth2' and category == 'token_cache': 269 return 'file_system' 270 if section == 'Boto' and category == 'https_validate_certificates': 271 return 'True' 272 return '' 273 with mock.patch('boto.config.get_value', side_effect=MockValidStrings): 274 self.assertEqual( 275 'check_hashes:if_fast_else_skip,content_language:chi,' 276 'disable_analytics_prompt:True,https_validate_certificates:True,' 277 'json_api_version:v3,prefer_api:xml,tab_completion_time_logs:True,' 278 'token_cache:file_system,use_magicfile:True', 279 self.collector._ValidateAndGetConfigValues()) 280 281 # Test that "small" and "large" integers are appropriately validated. 282 def MockValidSmallInts(_, category): 283 if category in int_categories: 284 return '1999' 285 return '' 286 with mock.patch('boto.config.get_value', side_effect=MockValidSmallInts): 287 self.assertEqual( 288 'debug:1999,default_api_version:1999,http_socket_timeout:1999,' 289 'max_retry_delay:1999,num_retries:1999,oauth2_refresh_retries:1999,' 290 'parallel_process_count:1999,parallel_thread_count:1999,' 291 'resumable_threshold:1999,rsync_buffer_lines:1999,' 292 'sliced_object_download_max_components:1999,' 293 'software_update_check_period:1999,tab_completion_timeout:1999,' 294 'task_estimation_threshold:1999', 295 self.collector._ValidateAndGetConfigValues()) 296 297 def MockValidLargeInts(_, category): 298 if category in int_categories: 299 return '2001' 300 return '' 301 with mock.patch('boto.config.get_value', side_effect=MockValidLargeInts): 302 self.assertEqual( 303 'debug:INVALID,default_api_version:INVALID,' 304 'http_socket_timeout:INVALID,max_retry_delay:INVALID,' 305 'num_retries:INVALID,oauth2_refresh_retries:INVALID,' 306 'parallel_process_count:INVALID,parallel_thread_count:INVALID,' 307 'resumable_threshold:2001,rsync_buffer_lines:2001,' 308 'sliced_object_download_max_components:INVALID,' 309 'software_update_check_period:INVALID,' 310 'tab_completion_timeout:INVALID,task_estimation_threshold:2001', 311 self.collector._ValidateAndGetConfigValues()) 312 313 # Test that a non-integer return value is invalid. 314 def MockNonIntegerValue(_, category): 315 if category in int_categories: 316 return '10.28' 317 return '' 318 with mock.patch('boto.config.get_value', side_effect=MockNonIntegerValue): 319 self.assertEqual( 320 ','.join([category + ':INVALID' for category in int_categories]), 321 self.collector._ValidateAndGetConfigValues()) 322 323 # Test data size validation. 324 def MockDataSizeValue(_, category): 325 if category in ('parallel_composite_upload_component_size', 326 'parallel_composite_upload_threshold', 327 'sliced_object_download_component_size', 328 'sliced_object_download_threshold'): 329 return '10MiB' 330 return '' 331 with mock.patch('boto.config.get_value', side_effect=MockDataSizeValue): 332 self.assertEqual('parallel_composite_upload_component_size:10485760,' 333 'parallel_composite_upload_threshold:10485760,' 334 'sliced_object_download_component_size:10485760,' 335 'sliced_object_download_threshold:10485760', 336 self.collector._ValidateAndGetConfigValues()) 337 338 def testCommandAndErrorEventsCollection(self): 339 """Tests the collection of command and error GA events.""" 340 self.assertEqual([], self.collector._metrics) 341 342 _LogAllTestMetrics() 343 # Only the first command should be logged. 344 metrics.LogCommandParams(command_name='cmd2') 345 346 # Commands and errors should not be collected until we explicitly collect 347 # them. 348 self.assertEqual([], self.collector._metrics) 349 self.collector._CollectCommandAndErrorMetrics() 350 self.assertEqual(COMMAND_AND_ERROR_TEST_METRICS, 351 set(self.collector._metrics)) 352 353 def testPerformanceSummaryEventCollection(self): 354 """Test the collection of PerformanceSummary GA events.""" 355 # PerformanceSummaries are only collected for cp and rsync. 356 self.collector.ga_params[metrics._GA_LABEL_MAP['Command Name']] = 'cp' 357 # GetDiskCounters is called at initialization of _PerformanceSummaryParams, 358 # which occurs during the first call to LogPerformanceSummaryParams. 359 with mock.patch('gslib.metrics.GetDiskCounters', 360 return_value={'fake-disk': (0, 0, 0, 0, 0, 0)}): 361 metrics.LogPerformanceSummaryParams( 362 uses_fan=True, uses_slice=True, avg_throughput=10, 363 is_daisy_chain=True, has_file_dst=False, has_cloud_dst=True, 364 has_file_src=False, has_cloud_src=True, total_bytes_transferred=100, 365 total_elapsed_time=10, thread_idle_time=40, thread_execution_time=10, 366 num_processes=2, num_threads=3, num_objects_transferred=3, 367 provider_types=['gs']) 368 369 # Log a retryable service error and two retryable network errors. 370 service_retry_msg = RetryableErrorMessage( 371 apitools_exceptions.CommunicationError(), 0) 372 network_retry_msg = RetryableErrorMessage(socket.error(), 0) 373 metrics.LogRetryableError(service_retry_msg) 374 metrics.LogRetryableError(network_retry_msg) 375 metrics.LogRetryableError(network_retry_msg) 376 377 # Log some thread throughput. 378 start_file_msg = FileMessage('src', 'dst', 0, size=100) 379 end_file_msg = FileMessage('src', 'dst', 10, finished=True) 380 start_file_msg.thread_id = end_file_msg.thread_id = 1 381 start_file_msg.process_id = end_file_msg.process_id = 1 382 metrics.LogPerformanceSummaryParams(file_message=start_file_msg) 383 metrics.LogPerformanceSummaryParams(file_message=end_file_msg) 384 self.assertEqual(self.collector.perf_sum_params.thread_throughputs[ 385 (1, 1)].GetThroughput(), 10) 386 387 # GetDiskCounters is called a second time during collection. 388 with mock.patch('gslib.metrics.GetDiskCounters', 389 return_value={'fake-disk': (0, 0, 0, 0, 10, 10)}): 390 self.collector._CollectPerformanceSummaryMetric() 391 392 # Check for all the expected parameters. 393 metric_body = self.collector._metrics[0].body 394 label_and_value_pairs = [ 395 ('Event Category', metrics._GA_PERFSUM_CATEGORY), 396 ('Event Action', 'CloudToCloud%2CDaisyChain'), ('Execution Time', '10'), 397 ('Parallelism Strategy', 'both'), ('Source URL Type', 'cloud'), 398 ('Provider Types', 'gs'), ('Num Processes', '2'), ('Num Threads', '3'), 399 ('Number of Files/Objects Transferred', '3'), 400 ('Size of Files/Objects Transferred', '100'), 401 ('Average Overall Throughput', '10'), 402 ('Num Retryable Service Errors', '1'), 403 ('Num Retryable Network Errors', '2'), 404 ('Thread Idle Time Percent', '0.8'), 405 ('Slowest Thread Throughput', '10'), 406 ('Fastest Thread Throughput', '10'), 407 ] 408 if IS_LINUX: # Disk I/O time is only available on Linux. 409 label_and_value_pairs.append(('Disk I/O Time', '20')) 410 for label, exp_value in label_and_value_pairs: 411 self.assertIn('{0}={1}'.format(metrics._GA_LABEL_MAP[label], exp_value), 412 metric_body) 413 414 def testCommandCollection(self): 415 """Tests the collection of command parameters.""" 416 _TryExceptAndPass(self.command_runner.RunNamedCommand, 417 'acl', ['set', '-a'], collect_analytics=True) 418 self.assertEqual( 419 'acl set', 420 self.collector.ga_params.get(metrics._GA_LABEL_MAP['Command Name'])) 421 self.assertEqual('a', self.collector.ga_params.get(metrics._GA_LABEL_MAP[ 422 'Command-Level Options'])) 423 424 # Reset the ga_params, which store the command info. 425 self.collector.ga_params.clear() 426 427 self.command_runner.RunNamedCommand('list', collect_analytics=True) 428 self.assertEqual( 429 'ls', 430 self.collector.ga_params.get(metrics._GA_LABEL_MAP['Command Name'])) 431 self.assertEqual( 432 'list', 433 self.collector.ga_params.get(metrics._GA_LABEL_MAP['Command Alias'])) 434 435 self.collector.ga_params.clear() 436 _TryExceptAndPass( 437 self.command_runner.RunNamedCommand, 438 'iam', ['get', 'dummy_bucket'], collect_analytics=True) 439 self.assertEqual( 440 'iam get', 441 self.collector.ga_params.get(metrics._GA_LABEL_MAP['Command Name'])) 442 443 # We only care about the error logging, not the actual exceptions handling. 444 @mock.patch.object(http_wrapper, 'HandleExceptionsAndRebuildHttpConnections') 445 def testRetryableErrorCollection(self, mock_default_retry): 446 """Tests the collection of a retryable error in the retry function.""" 447 # A DiscardMessagesQueue has the same retryable error-logging code as the 448 # UIThread and the MainThreadUIQueue. 449 mock_queue = RetryableErrorsQueue() 450 value_error_retry_args = http_wrapper.ExceptionRetryArgs(None, None, 451 ValueError(), None, 452 None, None) 453 socket_error_retry_args = http_wrapper.ExceptionRetryArgs(None, None, 454 socket.error(), 455 None, None, None) 456 metadata_retry_func = LogAndHandleRetries(is_data_transfer=False, 457 status_queue=mock_queue) 458 media_retry_func = LogAndHandleRetries(is_data_transfer=True, 459 status_queue=mock_queue) 460 461 metadata_retry_func(value_error_retry_args) 462 self.assertEqual(self.collector.retryable_errors['ValueError'], 1) 463 metadata_retry_func(value_error_retry_args) 464 self.assertEqual(self.collector.retryable_errors['ValueError'], 2) 465 metadata_retry_func(socket_error_retry_args) 466 self.assertEqual(self.collector.retryable_errors['SocketError'], 1) 467 468 # The media retry function raises an exception after logging because 469 # the GcsJsonApi handles retryable errors for media transfers itself. 470 _TryExceptAndPass(media_retry_func, value_error_retry_args) 471 _TryExceptAndPass(media_retry_func, socket_error_retry_args) 472 self.assertEqual(self.collector.retryable_errors['ValueError'], 3) 473 self.assertEqual(self.collector.retryable_errors['SocketError'], 2) 474 475 def testExceptionCatchingDecorator(self): 476 """Tests the exception catching decorator CaptureAndLogException.""" 477 478 # A wrapped function with an exception should not stop the process. 479 mock_exc_fn = mock.MagicMock(__name__='mock_exc_fn', 480 side_effect=Exception()) 481 wrapped_fn = metrics.CaptureAndLogException(mock_exc_fn) 482 wrapped_fn() 483 484 debug_messages = self.log_handler.messages['debug'] 485 self.assertIn('Exception captured in mock_exc_fn during metrics collection', 486 debug_messages[0]) 487 self.log_handler.reset() 488 489 self.assertEqual(1, mock_exc_fn.call_count) 490 491 mock_err_fn = mock.MagicMock(__name__='mock_err_fn', 492 side_effect=TypeError()) 493 wrapped_fn = metrics.CaptureAndLogException(mock_err_fn) 494 wrapped_fn() 495 self.assertEqual(1, mock_err_fn.call_count) 496 497 debug_messages = self.log_handler.messages['debug'] 498 self.assertIn('Exception captured in mock_err_fn during metrics collection', 499 debug_messages[0]) 500 self.log_handler.reset() 501 502 # Test that exceptions in the unprotected metrics functions are caught. 503 with mock.patch.object(MetricsCollector, 'GetCollector', 504 return_value='not a collector'): 505 # These calls should all fail, but the exceptions shouldn't propagate up. 506 metrics.Shutdown() 507 metrics.LogCommandParams() 508 metrics.LogRetryableError() 509 metrics.LogFatalError() 510 metrics.LogPerformanceSummaryParams() 511 metrics.CheckAndMaybePromptForAnalyticsEnabling('invalid argument') 512 513 debug_messages = self.log_handler.messages['debug'] 514 message_index = 0 515 for func_name in ('Shutdown', 'LogCommandParams', 'LogRetryableError', 516 'LogFatalError', 'LogPerformanceSummaryParams', 517 'CheckAndMaybePromptForAnalyticsEnabling'): 518 self.assertIn( 519 'Exception captured in %s during metrics collection' % func_name, 520 debug_messages[message_index]) 521 message_index += 1 522 523 self.log_handler.reset() 524 525 526# Mock callback handlers to throw errors in integration tests, based on handlers 527# from test_cp.py. 528class _JSONForceHTTPErrorCopyCallbackHandler(object): 529 """Test callback handler that raises an arbitrary HTTP error exception.""" 530 531 def __init__(self, startover_at_byte, http_error_num): 532 self._startover_at_byte = startover_at_byte 533 self._http_error_num = http_error_num 534 self.started_over_once = False 535 536 # pylint: disable=invalid-name 537 def call(self, total_bytes_transferred, unused_total_size): 538 """Forcibly exits if the transfer has passed the halting point.""" 539 if (total_bytes_transferred >= self._startover_at_byte and 540 not self.started_over_once): 541 self.started_over_once = True 542 raise apitools_exceptions.HttpError({'status': self._http_error_num}, 543 None, None) 544 545 546class _ResumableUploadRetryHandler(object): 547 """Test callback handler for causing retries during a resumable transfer.""" 548 549 def __init__(self, retry_at_byte, exception_to_raise, exc_args, 550 num_retries=1): 551 self._retry_at_byte = retry_at_byte 552 self._exception_to_raise = exception_to_raise 553 self._exception_args = exc_args 554 self._num_retries = num_retries 555 556 self._retries_made = 0 557 558 # pylint: disable=invalid-name 559 def call(self, total_bytes_transferred, unused_total_size): 560 """Cause a single retry at the retry point.""" 561 if (total_bytes_transferred >= self._retry_at_byte and 562 self._retries_made < self._num_retries): 563 self._retries_made += 1 564 raise self._exception_to_raise(*self._exception_args) 565 566 567class TestMetricsIntegrationTests(testcase.GsUtilIntegrationTestCase): 568 """Integration tests for analytics data collection.""" 569 570 def setUp(self): 571 super(TestMetricsIntegrationTests, self).setUp() 572 573 # Save the original state of the collector. 574 self.original_collector_instance = MetricsCollector.GetCollector() 575 576 # Set dummy attributes for the collector. 577 MetricsCollector.StartTestCollector('https://example.com', 'user-agent-007', 578 {'a': 'b', 'c': 'd'}) 579 self.collector = MetricsCollector.GetCollector() 580 581 def tearDown(self): 582 super(TestMetricsIntegrationTests, self).tearDown() 583 584 # Reset to default collection settings. 585 MetricsCollector.StopTestCollector( 586 original_instance=self.original_collector_instance) 587 588 def _RunGsUtilWithAnalyticsOutput(self, cmd, expected_status=0): 589 """Runs the gsutil command to check for metrics log output. 590 591 The env value is set so that the metrics collector in the subprocess will 592 use testing parameters and output the metrics collected to the debugging 593 log, which lets us check for proper collection in the stderr. 594 595 Args: 596 cmd: The command to run, as a list. 597 expected_status: The expected return code. 598 599 Returns: 600 The string of metrics output. 601 """ 602 stderr = self.RunGsUtil(['-d'] + cmd, return_stderr=True, 603 expected_status=expected_status, 604 env_vars={'GSUTIL_TEST_ANALYTICS': '2'}) 605 return METRICS_LOG_RE.search(stderr).group() 606 607 def _StartObjectPatch(self, *args, **kwargs): 608 """Runs mock.patch.object with the given args, and returns the mock object. 609 610 This starts the patcher, returns the mock object, and registers the patcher 611 to stop on test teardown. 612 613 Args: 614 *args: The args to pass to mock.patch.object() 615 **kwargs: The kwargs to pass to mock.patch.object() 616 617 Returns: 618 Mock, The result of starting the patcher. 619 """ 620 patcher = mock.patch.object(*args, **kwargs) 621 self.addCleanup(patcher.stop) 622 return patcher.start() 623 624 def _CheckParameterValue(self, param_name, exp_value, metrics_to_search): 625 """Checks for a correct key=value pair in a log output string.""" 626 self.assertIn( 627 '{0}={1}'.format(metrics._GA_LABEL_MAP[param_name], exp_value), 628 metrics_to_search) 629 630 @mock.patch('time.time', new=mock.MagicMock(return_value=0)) 631 def testMetricsReporting(self): 632 """Tests the subprocess creation by Popen in metrics.py.""" 633 popen_mock = self._StartObjectPatch(subprocess, 'Popen') 634 635 # Set up the temp file for pickle dumping metrics into. 636 metrics_file = tempfile.NamedTemporaryFile() 637 metrics_file.close() 638 temp_file_mock = self._StartObjectPatch(tempfile, 'NamedTemporaryFile') 639 temp_file_mock.return_value = open(metrics_file.name, 'wb') 640 641 # If there are no metrics, Popen should not be called. 642 self.collector.ReportMetrics() 643 self.assertEqual(0, popen_mock.call_count) 644 645 _LogAllTestMetrics() 646 647 # Report the metrics and check Popen calls. 648 metrics.Shutdown() 649 call_list = popen_mock.call_args_list 650 self.assertEqual(1, len(call_list)) 651 # Check to make sure that we have the proper PYTHONPATH in the subprocess. 652 args = call_list[0] 653 self.assertIn('PYTHONPATH', args[1]['env']) 654 # Ensure that we can access the same modules as the main process from 655 # PYTHONPATH. 656 missing_paths = ( 657 set(sys.path) - set(args[1]['env']['PYTHONPATH'].split(os.pathsep))) 658 self.assertEqual(set(), missing_paths) 659 660 # Check that the metrics were correctly dumped into the temp file. 661 with open(metrics_file.name, 'rb') as metrics_file: 662 reported_metrics = pickle.load(metrics_file) 663 self.assertEqual(COMMAND_AND_ERROR_TEST_METRICS, 664 set(reported_metrics)) 665 666 @mock.patch('time.time', new=mock.MagicMock(return_value=0)) 667 def testMetricsPosting(self): 668 """Tests the metrics posting process as performed in metrics_reporter.py.""" 669 # Clear the log file. 670 open(LOG_FILE_PATH, 'w').close() 671 metrics.LogCommandParams( 672 global_opts=[('-y', 'value'), ('-z', ''), ('-x', '')]) 673 674 # Collect a metric and set log level for the metrics_reporter subprocess. 675 def CollectMetricAndSetLogLevel(log_level): 676 metrics.LogCommandParams(command_name='cmd1', subcommands=['action1'], 677 sub_opts=[('optb', ''), ('opta', '')]) 678 metrics.LogFatalError(gslib.exception.CommandException('test')) 679 680 # Wait for report to make sure the log is written before we check it. 681 self.collector.ReportMetrics(wait_for_report=True, log_level=log_level) 682 self.assertEqual([], self.collector._metrics) 683 684 # The log file should be empty unless the debug option is specified. 685 CollectMetricAndSetLogLevel(logging.DEBUG) 686 with open(LOG_FILE_PATH, 'rb') as metrics_log: 687 log_text = metrics_log.read() 688 expected_response = ( 689 'Metric(endpoint=\'https://example.com\', method=\'POST\', ' 690 'body=\'{0}&cm2=0&ea=cmd1+action1&ec={1}&el={2}&ev=0\', ' 691 'user_agent=\'user-agent-007\')'.format(GLOBAL_DIMENSIONS_URL_PARAMS, 692 metrics._GA_COMMANDS_CATEGORY, 693 VERSION)) 694 self.assertIn(expected_response, log_text) 695 self.assertIn('RESPONSE: 200', log_text) 696 697 CollectMetricAndSetLogLevel(logging.INFO) 698 with open(LOG_FILE_PATH, 'rb') as metrics_log: 699 log_text = metrics_log.read() 700 self.assertEqual(log_text, '') 701 702 CollectMetricAndSetLogLevel(logging.WARN) 703 with open(LOG_FILE_PATH, 'rb') as metrics_log: 704 log_text = metrics_log.read() 705 self.assertEqual(log_text, '') 706 707 def testMetricsReportingWithFail(self): 708 """Tests that metrics reporting error does not throw an exception.""" 709 popen_mock = self._StartObjectPatch(subprocess, 'Popen') 710 popen_mock.side_effect = OSError() 711 712 self.collector._metrics.append('dummy metric') 713 # Shouldn't raise an exception. 714 self.collector.ReportMetrics() 715 716 self.assertTrue(popen_mock.called) 717 718 def testCommandCollection(self): 719 """Tests the collection of commands.""" 720 metrics_list = self._RunGsUtilWithAnalyticsOutput( 721 ['-m', 'acl', 'set', '-a'], expected_status=1) 722 self._CheckParameterValue('Event Category', metrics._GA_COMMANDS_CATEGORY, 723 metrics_list) 724 self._CheckParameterValue('Event Action', 'acl+set', metrics_list) 725 # Check that the options were collected. 726 self._CheckParameterValue('Global Options', 'd%2Cm', metrics_list) 727 self._CheckParameterValue('Command-Level Options', 'a', metrics_list) 728 729 metrics_list = self._RunGsUtilWithAnalyticsOutput(['ver']) 730 self._CheckParameterValue('Event Category', metrics._GA_COMMANDS_CATEGORY, 731 metrics_list) 732 self._CheckParameterValue('Event Action', 'version', metrics_list) 733 # Check the recording of the command alias. 734 self._CheckParameterValue('Command Alias', 'ver', metrics_list) 735 736 def testRetryableErrorMetadataCollection(self): 737 """Tests that retryable errors are collected on JSON metadata operations.""" 738 # Retryable errors will only be collected with the JSON API. 739 if self.test_api != ApiSelector.JSON: 740 return unittest.skip('Retryable errors are only collected in JSON') 741 742 bucket_uri = self.CreateBucket() 743 object_uri = self.CreateObject(bucket_uri=bucket_uri, 744 object_name='foo', contents='bar') 745 # Set the command name to rsync in order to collect PerformanceSummary info. 746 self.collector.ga_params[metrics._GA_LABEL_MAP['Command Name']] = 'rsync' 747 # Generate a JSON API instance to test with, because the RunGsUtil method 748 # may use the XML API. 749 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), 750 RetryableErrorsQueue(), self.default_provider) 751 # Don't wait for too many retries or for long periods between retries to 752 # avoid long tests. 753 gsutil_api.api_client.num_retries = 2 754 gsutil_api.api_client.max_retry_wait = 1 755 756 # Throw an error when transferring metadata. 757 key = object_uri.get_key() 758 src_obj_metadata = apitools_messages.Object(name=key.name, 759 bucket=key.bucket.name, 760 contentType=key.content_type) 761 dst_obj_metadata = apitools_messages.Object( 762 bucket=src_obj_metadata.bucket, 763 name=self.MakeTempName('object'), 764 contentType=src_obj_metadata.contentType) 765 with mock.patch.object(http_wrapper, '_MakeRequestNoRetry', 766 side_effect=socket.error()): 767 _TryExceptAndPass(gsutil_api.CopyObject, src_obj_metadata, 768 dst_obj_metadata) 769 self.assertEqual(self.collector.retryable_errors['SocketError'], 1) 770 771 # Throw an error when removing a bucket. 772 with mock.patch.object( 773 http_wrapper, '_MakeRequestNoRetry', 774 side_effect=apitools_exceptions.HttpError('unused', 'unused', 775 'unused')): 776 _TryExceptAndPass(gsutil_api.DeleteObject, bucket_uri.bucket_name, 777 object_uri.object_name) 778 self.assertEqual(self.collector.retryable_errors['HttpError'], 1) 779 780 # Check that the number of each kind of retryable error was logged. 781 self.assertEqual( 782 self.collector.perf_sum_params.num_retryable_network_errors, 1) 783 self.assertEqual( 784 self.collector.perf_sum_params.num_retryable_service_errors, 1) 785 786 def testRetryableErrorMediaCollection(self): 787 """Tests that retryable errors are collected on JSON media operations.""" 788 # Retryable errors will only be collected with the JSON API. 789 if self.test_api != ApiSelector.JSON: 790 return unittest.skip('Retryable errors are only collected in JSON') 791 792 boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB))] 793 bucket_uri = self.CreateBucket() 794 # For the resumable upload exception, we need to ensure at least one 795 # callback occurs. 796 halt_size = START_CALLBACK_PER_BYTES * 2 797 fpath = self.CreateTempFile(contents='a' * halt_size) 798 799 # Test that the retry function for data transfers catches and logs an error. 800 test_callback_file = self.CreateTempFile(contents=pickle.dumps( 801 _ResumableUploadRetryHandler(5, apitools_exceptions.BadStatusCodeError, 802 ('unused', 'unused', 'unused')))) 803 with SetBotoConfigForTest(boto_config_for_test): 804 metrics_list = self._RunGsUtilWithAnalyticsOutput( 805 ['cp', '--testcallbackfile', test_callback_file, 806 fpath, suri(bucket_uri)]) 807 self._CheckParameterValue('Event Category', 808 metrics._GA_ERRORRETRY_CATEGORY, metrics_list) 809 self._CheckParameterValue('Event Action', 'BadStatusCodeError', 810 metrics_list) 811 self._CheckParameterValue('Retryable Errors', '1', metrics_list) 812 self._CheckParameterValue('Num Retryable Service Errors', '1', 813 metrics_list) 814 815 # Test that the ResumableUploadStartOverException in copy_helper is caught. 816 test_callback_file = self.CreateTempFile( 817 contents=pickle.dumps(_JSONForceHTTPErrorCopyCallbackHandler(5, 404))) 818 with SetBotoConfigForTest(boto_config_for_test): 819 metrics_list = self._RunGsUtilWithAnalyticsOutput( 820 ['cp', '--testcallbackfile', test_callback_file, 821 fpath, suri(bucket_uri)]) 822 self._CheckParameterValue( 823 'Event Category', metrics._GA_ERRORRETRY_CATEGORY, metrics_list) 824 self._CheckParameterValue( 825 'Event Action', 'ResumableUploadStartOverException', metrics_list) 826 self._CheckParameterValue('Retryable Errors', '1', metrics_list) 827 self._CheckParameterValue( 828 'Num Retryable Service Errors', '1', metrics_list) 829 830 # Test retryable error collection in a multithread/multiprocess situation. 831 test_callback_file = self.CreateTempFile( 832 contents=pickle.dumps(_JSONForceHTTPErrorCopyCallbackHandler(5, 404))) 833 with SetBotoConfigForTest(boto_config_for_test): 834 metrics_list = self._RunGsUtilWithAnalyticsOutput( 835 ['-m', 'cp', '--testcallbackfile', 836 test_callback_file, fpath, suri(bucket_uri)]) 837 self._CheckParameterValue('Event Category', 838 metrics._GA_ERRORRETRY_CATEGORY, metrics_list) 839 self._CheckParameterValue( 840 'Event Action', 'ResumableUploadStartOverException', metrics_list) 841 self._CheckParameterValue('Retryable Errors', '1', metrics_list) 842 self._CheckParameterValue( 843 'Num Retryable Service Errors', '1', metrics_list) 844 845 def testFatalErrorCollection(self): 846 """Tests that fatal errors are collected.""" 847 def CheckForCommandException(log_output): 848 self._CheckParameterValue('Event Category', 849 metrics._GA_ERRORFATAL_CATEGORY, log_output) 850 self._CheckParameterValue('Event Action', 'CommandException', log_output) 851 852 metrics_list = self._RunGsUtilWithAnalyticsOutput( 853 ['invalid-command'], expected_status=1) 854 CheckForCommandException(metrics_list) 855 856 metrics_list = self._RunGsUtilWithAnalyticsOutput( 857 ['mb', '-invalid-option'], expected_status=1) 858 CheckForCommandException(metrics_list) 859 860 bucket_uri = self.CreateBucket() 861 metrics_list = self._RunGsUtilWithAnalyticsOutput( 862 ['cp', suri(bucket_uri), suri(bucket_uri)], expected_status=1) 863 CheckForCommandException(metrics_list) 864 865 def _GetAndCheckAllNumberMetrics(self, metrics_to_search, multithread=True): 866 """Checks number metrics for PerformanceSummary tests. 867 868 Args: 869 metrics_to_search: The string of metrics to search. 870 multithread: False if the the metrics were collected in a non-multithread 871 situation. 872 873 Returns: 874 (slowest_throughput, fastest_throughput, io_time) as floats. 875 """ 876 def _ExtractNumberMetric(param_name): 877 extracted_match = re.search( 878 metrics._GA_LABEL_MAP[param_name] + r'=(\d+\.?\d*)&', 879 metrics_to_search) 880 if not extracted_match: 881 self.fail('Could not find %s (%s) in metrics string %s' % 882 (metrics._GA_LABEL_MAP[param_name], param_name, 883 metrics_to_search)) 884 return float(extracted_match.group(1)) 885 886 # Thread idle time will only be recorded in a multithread situation. 887 if multithread: 888 thread_idle_time = _ExtractNumberMetric('Thread Idle Time Percent') 889 # This should be a decimal between 0 and 1. 890 self.assertGreaterEqual(thread_idle_time, 0) 891 self.assertLessEqual(thread_idle_time, 1) 892 893 throughput = _ExtractNumberMetric('Average Overall Throughput') 894 self.assertGreater(throughput, 0) 895 896 slowest_throughput = _ExtractNumberMetric('Slowest Thread Throughput') 897 fastest_throughput = _ExtractNumberMetric('Fastest Thread Throughput') 898 self.assertGreaterEqual(fastest_throughput, slowest_throughput) 899 self.assertGreater(slowest_throughput, 0) 900 self.assertGreater(fastest_throughput, 0) 901 902 io_time = None 903 if IS_LINUX: 904 io_time = _ExtractNumberMetric('Disk I/O Time') 905 self.assertGreaterEqual(io_time, 0) 906 907 # Return some metrics to run tests in more specific scenarios. 908 return (slowest_throughput, fastest_throughput, io_time) 909 910 def testPerformanceSummaryFileToFile(self): 911 """Tests PerformanceSummary collection in a file-to-file transfer.""" 912 tmpdir1 = self.CreateTempDir() 913 tmpdir2 = self.CreateTempDir() 914 file_size = ONE_MIB 915 self.CreateTempFile(tmpdir=tmpdir1, contents='a' * file_size) 916 917 # Run an rsync file-to-file command with fan parallelism, without slice 918 # parallelism. 919 process_count = 1 if IS_WINDOWS else 6 920 with SetBotoConfigForTest([ 921 ('GSUtil', 'parallel_process_count', str(process_count)), 922 ('GSUtil', 'parallel_thread_count', '7')]): 923 metrics_list = self._RunGsUtilWithAnalyticsOutput( 924 ['-m', 'rsync', tmpdir1, tmpdir2]) 925 self._CheckParameterValue('Event Category', metrics._GA_PERFSUM_CATEGORY, 926 metrics_list) 927 self._CheckParameterValue('Event Action', 'FileToFile', metrics_list) 928 self._CheckParameterValue('Parallelism Strategy', 'fan', metrics_list) 929 self._CheckParameterValue('Source URL Type', 'file', metrics_list) 930 self._CheckParameterValue('Num Processes', str(process_count), 931 metrics_list) 932 self._CheckParameterValue('Num Threads', '7', metrics_list) 933 self._CheckParameterValue('Provider Types', 'file', metrics_list) 934 self._CheckParameterValue('Size of Files/Objects Transferred', file_size, 935 metrics_list) 936 self._CheckParameterValue('Number of Files/Objects Transferred', 1, 937 metrics_list) 938 939 (_, _, io_time) = self._GetAndCheckAllNumberMetrics(metrics_list) 940 if IS_LINUX: # io_time will be None on other platforms. 941 # We can't guarantee that the file read/write will consume a 942 # reportable amount of disk I/O, but it should be reported as >= 0. 943 self.assertGreaterEqual(io_time, 0) 944 945 @SkipForS3('No slice parallelism support for S3.') 946 def testPerformanceSummaryFileToCloud(self): 947 """Tests PerformanceSummary collection in a file-to-cloud transfer.""" 948 bucket_uri = self.CreateBucket() 949 tmpdir = self.CreateTempDir() 950 file_size = 6 951 self.CreateTempFile(tmpdir=tmpdir, contents='a' * file_size) 952 self.CreateTempFile(tmpdir=tmpdir, contents='b' * file_size) 953 954 process_count = 1 if IS_WINDOWS else 2 955 # Run a parallel composite upload without fan parallelism. 956 with SetBotoConfigForTest([ 957 ('GSUtil', 'parallel_process_count', str(process_count)), 958 ('GSUtil', 'parallel_thread_count', '3'), 959 ('GSUtil', 'parallel_composite_upload_threshold', '1')]): 960 metrics_list = self._RunGsUtilWithAnalyticsOutput( 961 ['rsync', tmpdir, suri(bucket_uri)]) 962 self._CheckParameterValue('Event Category', metrics._GA_PERFSUM_CATEGORY, 963 metrics_list) 964 self._CheckParameterValue('Event Action', 'FileToCloud', metrics_list) 965 self._CheckParameterValue('Parallelism Strategy', 'slice', metrics_list) 966 self._CheckParameterValue('Num Processes', str(process_count), 967 metrics_list) 968 self._CheckParameterValue('Num Threads', '3', metrics_list) 969 self._CheckParameterValue('Provider Types', 'file%2C' + bucket_uri.scheme, 970 metrics_list) 971 self._CheckParameterValue('Size of Files/Objects Transferred', 972 2 * file_size, metrics_list) 973 self._CheckParameterValue('Number of Files/Objects Transferred', 2, 974 metrics_list) 975 (_, _, io_time) = self._GetAndCheckAllNumberMetrics(metrics_list) 976 if IS_LINUX: # io_time will be None on other platforms. 977 # We can't guarantee that the file read will consume a 978 # reportable amount of disk I/O, but it should be reported as >= 0. 979 self.assertGreaterEqual(io_time, 0) 980 981 @SkipForS3('No slice parallelism support for S3.') 982 def testPerformanceSummaryCloudToFile(self): 983 """Tests PerformanceSummary collection in a cloud-to-file transfer.""" 984 bucket_uri = self.CreateBucket() 985 file_size = 6 986 object_uri = self.CreateObject( 987 bucket_uri=bucket_uri, contents='a' * file_size) 988 989 fpath = self.CreateTempFile() 990 # Run a sliced object download with fan parallelism. 991 process_count = 1 if IS_WINDOWS else 4 992 with SetBotoConfigForTest([ 993 ('GSUtil', 'parallel_process_count', str(process_count)), 994 ('GSUtil', 'parallel_thread_count', '5'), 995 ('GSUtil', 'sliced_object_download_threshold', '1'), 996 ('GSUtil', 'test_assume_fast_crcmod', 'True')]): 997 metrics_list = self._RunGsUtilWithAnalyticsOutput( 998 ['-m', 'cp', suri(object_uri), fpath]) 999 self._CheckParameterValue('Event Category', metrics._GA_PERFSUM_CATEGORY, 1000 metrics_list) 1001 self._CheckParameterValue('Event Action', 'CloudToFile', metrics_list) 1002 self._CheckParameterValue('Parallelism Strategy', 'both', metrics_list) 1003 self._CheckParameterValue('Num Processes', str(process_count), 1004 metrics_list) 1005 self._CheckParameterValue('Num Threads', '5', metrics_list) 1006 self._CheckParameterValue('Provider Types', 'file%2C' + bucket_uri.scheme, 1007 metrics_list) 1008 self._CheckParameterValue('Number of Files/Objects Transferred', '1', 1009 metrics_list) 1010 self._CheckParameterValue('Size of Files/Objects Transferred', file_size, 1011 metrics_list) 1012 (_, _, io_time) = self._GetAndCheckAllNumberMetrics(metrics_list) 1013 if IS_LINUX: # io_time will be None on other platforms. 1014 # We can't guarantee that the file write will consume a 1015 # reportable amount of disk I/O, but it should be reported as >= 0. 1016 self.assertGreaterEqual(io_time, 0) 1017 1018 def testPerformanceSummaryCloudToCloud(self): 1019 """Tests PerformanceSummary collection in a cloud-to-cloud transfer.""" 1020 bucket1_uri = self.CreateBucket() 1021 bucket2_uri = self.CreateBucket() 1022 file_size = 6 1023 key_uri = self.CreateObject( 1024 bucket_uri=bucket1_uri, contents='a' * file_size) 1025 1026 # Run a daisy-chain cloud-to-cloud copy without parallelism. 1027 metrics_list = self._RunGsUtilWithAnalyticsOutput( 1028 ['cp', '-D', suri(key_uri), suri(bucket2_uri)]) 1029 1030 (slowest_throughput, fastest_throughput, 1031 _) = self._GetAndCheckAllNumberMetrics(metrics_list, multithread=False) 1032 # Since there's a single thread, this must be the case. 1033 self.assertEqual(slowest_throughput, fastest_throughput) 1034 1035 self._CheckParameterValue('Event Category', metrics._GA_PERFSUM_CATEGORY, 1036 metrics_list) 1037 self._CheckParameterValue('Event Action', 'CloudToCloud%2CDaisyChain', 1038 metrics_list) 1039 self._CheckParameterValue('Parallelism Strategy', 'none', metrics_list) 1040 self._CheckParameterValue('Source URL Type', 'cloud', metrics_list) 1041 self._CheckParameterValue('Num Processes', '1', metrics_list) 1042 self._CheckParameterValue('Num Threads', '1', metrics_list) 1043 self._CheckParameterValue('Provider Types', bucket1_uri.scheme, 1044 metrics_list) 1045 self._CheckParameterValue('Number of Files/Objects Transferred', '1', 1046 metrics_list) 1047 self._CheckParameterValue('Size of Files/Objects Transferred', file_size, 1048 metrics_list) 1049 1050 @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials') 1051 def testCrossProviderDaisyChainCollection(self): 1052 """Tests the collection of daisy-chain operations.""" 1053 s3_bucket = self.CreateBucket(provider='s3') 1054 gs_bucket = self.CreateBucket(provider='gs') 1055 unused_s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='foo') 1056 gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='bar') 1057 1058 metrics_list = self._RunGsUtilWithAnalyticsOutput( 1059 ['rsync', suri(s3_bucket), suri(gs_bucket)]) 1060 self._CheckParameterValue('Event Action', 'CloudToCloud%2CDaisyChain', 1061 metrics_list) 1062 self._CheckParameterValue('Provider Types', 'gs%2Cs3', metrics_list) 1063 1064 metrics_list = self._RunGsUtilWithAnalyticsOutput( 1065 ['cp', suri(gs_key), suri(s3_bucket)]) 1066 self._CheckParameterValue('Event Action', 'CloudToCloud%2CDaisyChain', 1067 metrics_list) 1068 self._CheckParameterValue('Provider Types', 'gs%2Cs3', metrics_list) 1069