1# -*- coding: utf-8 -*-
2# Copyright 2015 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15# pylint:mode=test
16"""Unit tests for analytics data collection."""
17
18from __future__ import absolute_import
19
20import logging
21import os
22import pickle
23import re
24import socket
25import subprocess
26import sys
27import tempfile
28
29from apitools.base.py import exceptions as apitools_exceptions
30from apitools.base.py import http_wrapper
31from boto.storage_uri import BucketStorageUri
32
33from gslib import metrics
34from gslib import VERSION
35from gslib.cs_api_map import ApiSelector
36import gslib.exception
37from gslib.gcs_json_api import GcsJsonApi
38from gslib.metrics import MetricsCollector
39from gslib.metrics_reporter import LOG_FILE_PATH
40from gslib.metrics_tuple import Metric
41from gslib.tests.mock_logging_handler import MockLoggingHandler
42import gslib.tests.testcase as testcase
43from gslib.tests.testcase.integration_testcase import SkipForS3
44from gslib.tests.util import HAS_S3_CREDS
45from gslib.tests.util import ObjectToURI as suri
46from gslib.tests.util import SetBotoConfigForTest
47from gslib.tests.util import unittest
48from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
49from gslib.thread_message import FileMessage
50from gslib.thread_message import RetryableErrorMessage
51from gslib.util import IS_LINUX
52from gslib.util import IS_WINDOWS
53from gslib.util import LogAndHandleRetries
54from gslib.util import ONE_KIB
55from gslib.util import ONE_MIB
56from gslib.util import START_CALLBACK_PER_BYTES
57import mock
58
59# A piece of the URL logged for all of the tests.
60GLOBAL_DIMENSIONS_URL_PARAMS = (
61    'a=b&c=d&cd1=cmd1+action1&cd10=0&cd2=x%2Cy%2Cz&cd3=opta%2Coptb&'
62    'cd6=CommandException&cm1=0')
63
64GLOBAL_PARAMETERS = ['a=b', 'c=d', 'cd1=cmd1 action1', 'cd2=x,y,z',
65                     'cd3=opta,optb', 'cd6=CommandException', 'cm1=0',
66                     'ev=0', 'el={0}'.format(VERSION)]
67COMMAND_AND_ERROR_TEST_METRICS = set([
68    Metric(
69        'https://example.com', 'POST',
70        '{0}&cm2=3&ea=cmd1+action1&ec={1}&el={2}&ev=0'.format(
71            GLOBAL_DIMENSIONS_URL_PARAMS, metrics._GA_COMMANDS_CATEGORY,
72            VERSION),
73        'user-agent-007'),
74    Metric(
75        'https://example.com', 'POST',
76        '{0}&cm2=2&ea=Exception&ec={1}&el={2}&ev=0'.format(
77            GLOBAL_DIMENSIONS_URL_PARAMS, metrics._GA_ERRORRETRY_CATEGORY,
78            VERSION),
79        'user-agent-007'),
80    Metric(
81        'https://example.com', 'POST',
82        '{0}&cm2=1&ea=ValueError&ec={1}&el={2}&ev=0'.format(
83            GLOBAL_DIMENSIONS_URL_PARAMS, metrics._GA_ERRORRETRY_CATEGORY,
84            VERSION),
85        'user-agent-007'),
86    Metric(
87        'https://example.com', 'POST',
88        '{0}&ea=CommandException&ec={1}&el={2}&ev=0'.format(
89            GLOBAL_DIMENSIONS_URL_PARAMS, metrics._GA_ERRORFATAL_CATEGORY,
90            VERSION),
91        'user-agent-007')
92])
93
94# A regex to find the list of metrics in log output.
95METRICS_LOG_RE = re.compile(r'(\[Metric.*\])')
96
97
98def _TryExceptAndPass(func, *args, **kwargs):
99  """Calls the given function with the arguments and ignores exceptions.
100
101  In these tests, we often force a failure that doesn't matter in order to
102  check that a metric was collected.
103
104  Args:
105    func: The function to call.
106    *args: Any arguments to call the function with.
107    **kwargs: Any named arguments to call the function with.
108  """
109  try:
110    func(*args, **kwargs)
111  except:  # pylint: disable=bare-except
112    pass
113
114
115def _LogAllTestMetrics():
116  """Logs all the common metrics for a test."""
117  metrics.LogCommandParams(
118      command_name='cmd1', subcommands=['action1'],
119      global_opts=[('-y', 'value'), ('-z', ''), ('-x', '')],
120      sub_opts=[('optb', ''), ('opta', '')])
121  retry_msg_1 = RetryableErrorMessage(Exception(), 0)
122  retry_msg_2 = RetryableErrorMessage(ValueError(), 0)
123  metrics.LogRetryableError(retry_msg_1)
124  metrics.LogRetryableError(retry_msg_1)
125  metrics.LogRetryableError(retry_msg_2)
126  metrics.LogFatalError(gslib.exception.CommandException('test'))
127
128
129class RetryableErrorsQueue(object):
130  """Emulates Cloud API status queue, processes only RetryableErrorMessages."""
131
132  def put(self, status_item):  # pylint: disable=invalid-name
133    if isinstance(status_item, RetryableErrorMessage):
134      metrics.LogRetryableError(status_item)
135
136
137@mock.patch('time.time', new=mock.MagicMock(return_value=0))
138class TestMetricsUnitTests(testcase.GsUtilUnitTestCase):
139  """Unit tests for analytics data collection."""
140
141  def setUp(self):
142    super(TestMetricsUnitTests, self).setUp()
143
144    # Save the original state of the collector.
145    self.original_collector_instance = MetricsCollector.GetCollector()
146
147    # Set dummy attributes for the collector.
148    MetricsCollector.StartTestCollector('https://example.com', 'user-agent-007',
149                                        {'a': 'b', 'c': 'd'})
150    self.collector = MetricsCollector.GetCollector()
151
152    self.log_handler = MockLoggingHandler()
153    # Use metrics logger to avoid impacting the root logger which may
154    # interfere with other tests.
155    logging.getLogger('metrics').setLevel(logging.DEBUG)
156    logging.getLogger('metrics').addHandler(self.log_handler)
157
158  def tearDown(self):
159    super(TestMetricsUnitTests, self).tearDown()
160
161    # Reset to default collection settings.
162    MetricsCollector.StopTestCollector(
163        original_instance=self.original_collector_instance)
164
165  def testDisabling(self):
166    """Tests enabling/disabling of metrics collection."""
167    self.assertEqual(self.collector, MetricsCollector.GetCollector())
168
169    # Test when gsutil is part of the Cloud SDK and the user opted in there.
170    with mock.patch.dict(os.environ,
171                         values={'CLOUDSDK_WRAPPER': '1',
172                                 'GA_CID': '555'}):
173      MetricsCollector._CheckAndSetDisabledCache()
174      self.assertFalse(MetricsCollector._disabled_cache)
175      self.assertEqual(self.collector, MetricsCollector.GetCollector())
176
177    # Test when gsutil is part of the Cloud SDK and the user did not opt in
178    # there.
179    with mock.patch.dict(os.environ,
180                         values={'CLOUDSDK_WRAPPER': '1',
181                                 'GA_CID': ''}):
182      MetricsCollector._CheckAndSetDisabledCache()
183      self.assertTrue(MetricsCollector._disabled_cache)
184      self.assertEqual(None, MetricsCollector.GetCollector())
185
186    # Test when gsutil is not part of the Cloud SDK and there is no UUID file.
187    with mock.patch.dict(os.environ, values={'CLOUDSDK_WRAPPER': ''}):
188      with mock.patch('os.path.exists', return_value=False):
189        MetricsCollector._CheckAndSetDisabledCache()
190        self.assertTrue(MetricsCollector._disabled_cache)
191        self.assertEqual(None, MetricsCollector.GetCollector())
192
193    # Test when gsutil is not part of the Cloud SDK and there is a UUID file.
194    with mock.patch.dict(os.environ, values={'CLOUDSDK_WRAPPER': ''}):
195      with mock.patch('os.path.exists', return_value=True):
196        # Mock the contents of the file.
197        with mock.patch('__builtin__.open') as mock_open:
198          mock_open.return_value.__enter__ = lambda s: s
199
200          # Set the file.read() method to return the disabled text.
201          mock_open.return_value.read.return_value = metrics._DISABLED_TEXT
202          MetricsCollector._CheckAndSetDisabledCache()
203          self.assertTrue(MetricsCollector._disabled_cache)
204          self.assertEqual(None, MetricsCollector.GetCollector())
205
206          # Set the file.read() method to return a mock cid (analytics enabled).
207          mock_open.return_value.read.return_value = 'mock_cid'
208          MetricsCollector._CheckAndSetDisabledCache()
209          self.assertFalse(MetricsCollector._disabled_cache)
210          self.assertEqual(self.collector, MetricsCollector.GetCollector())
211
212          # Check that open/read was called twice.
213          self.assertEqual(2, len(mock_open.call_args_list))
214          self.assertEqual(2, len(mock_open.return_value.read.call_args_list))
215
216  def testConfigValueValidation(self):
217    """Tests the validation of potentially PII config values."""
218    string_and_bool_categories = ['check_hashes', 'content_language',
219                                  'disable_analytics_prompt',
220                                  'https_validate_certificates',
221                                  'json_api_version',
222                                  'parallel_composite_upload_component_size',
223                                  'parallel_composite_upload_threshold',
224                                  'prefer_api',
225                                  'sliced_object_download_component_size',
226                                  'sliced_object_download_threshold',
227                                  'tab_completion_time_logs', 'token_cache',
228                                  'use_magicfile']
229    int_categories = ['debug', 'default_api_version', 'http_socket_timeout',
230                      'max_retry_delay', 'num_retries',
231                      'oauth2_refresh_retries', 'parallel_process_count',
232                      'parallel_thread_count', 'resumable_threshold',
233                      'rsync_buffer_lines',
234                      'sliced_object_download_max_components',
235                      'software_update_check_period', 'tab_completion_timeout',
236                      'task_estimation_threshold']
237    all_categories = sorted(string_and_bool_categories + int_categories)
238
239    # Test general invalid values.
240    with mock.patch('boto.config.get_value', return_value=None):
241      self.assertEqual('', self.collector._ValidateAndGetConfigValues())
242
243    with mock.patch('boto.config.get_value', return_value='invalid string'):
244      self.assertEqual(','.join([
245          category + ':INVALID' for category in all_categories
246      ]), self.collector._ValidateAndGetConfigValues())
247
248    # Test that non-ASCII characters are invalid.
249    with mock.patch('boto.config.get_value', return_value='£'):
250      self.assertEqual(','.join([
251          category + ':INVALID' for category in all_categories
252      ]), self.collector._ValidateAndGetConfigValues())
253
254    # Mock valid return values for specific string validations.
255    def MockValidStrings(section, category):
256      if section == 'GSUtil':
257        if category == 'check_hashes':
258          return 'if_fast_else_skip'
259        if category == 'content_language':
260          return 'chi'
261        if category == 'json_api_version':
262          return 'v3'
263        if category == 'prefer_api':
264          return 'xml'
265        if category in ('disable_analytics_prompt', 'use_magicfile',
266                        'tab_completion_time_logs'):
267          return 'True'
268      if section == 'OAuth2' and category == 'token_cache':
269        return 'file_system'
270      if section == 'Boto' and category == 'https_validate_certificates':
271        return 'True'
272      return ''
273    with mock.patch('boto.config.get_value', side_effect=MockValidStrings):
274      self.assertEqual(
275          'check_hashes:if_fast_else_skip,content_language:chi,'
276          'disable_analytics_prompt:True,https_validate_certificates:True,'
277          'json_api_version:v3,prefer_api:xml,tab_completion_time_logs:True,'
278          'token_cache:file_system,use_magicfile:True',
279          self.collector._ValidateAndGetConfigValues())
280
281    # Test that "small" and "large" integers are appropriately validated.
282    def MockValidSmallInts(_, category):
283      if category in int_categories:
284        return '1999'
285      return ''
286    with mock.patch('boto.config.get_value', side_effect=MockValidSmallInts):
287      self.assertEqual(
288          'debug:1999,default_api_version:1999,http_socket_timeout:1999,'
289          'max_retry_delay:1999,num_retries:1999,oauth2_refresh_retries:1999,'
290          'parallel_process_count:1999,parallel_thread_count:1999,'
291          'resumable_threshold:1999,rsync_buffer_lines:1999,'
292          'sliced_object_download_max_components:1999,'
293          'software_update_check_period:1999,tab_completion_timeout:1999,'
294          'task_estimation_threshold:1999',
295          self.collector._ValidateAndGetConfigValues())
296
297    def MockValidLargeInts(_, category):
298      if category in int_categories:
299        return '2001'
300      return ''
301    with mock.patch('boto.config.get_value', side_effect=MockValidLargeInts):
302      self.assertEqual(
303          'debug:INVALID,default_api_version:INVALID,'
304          'http_socket_timeout:INVALID,max_retry_delay:INVALID,'
305          'num_retries:INVALID,oauth2_refresh_retries:INVALID,'
306          'parallel_process_count:INVALID,parallel_thread_count:INVALID,'
307          'resumable_threshold:2001,rsync_buffer_lines:2001,'
308          'sliced_object_download_max_components:INVALID,'
309          'software_update_check_period:INVALID,'
310          'tab_completion_timeout:INVALID,task_estimation_threshold:2001',
311          self.collector._ValidateAndGetConfigValues())
312
313      # Test that a non-integer return value is invalid.
314      def MockNonIntegerValue(_, category):
315        if category in int_categories:
316          return '10.28'
317        return ''
318      with mock.patch('boto.config.get_value', side_effect=MockNonIntegerValue):
319        self.assertEqual(
320            ','.join([category + ':INVALID' for category in int_categories]),
321            self.collector._ValidateAndGetConfigValues())
322
323      # Test data size validation.
324      def MockDataSizeValue(_, category):
325        if category in ('parallel_composite_upload_component_size',
326                        'parallel_composite_upload_threshold',
327                        'sliced_object_download_component_size',
328                        'sliced_object_download_threshold'):
329          return '10MiB'
330        return ''
331      with mock.patch('boto.config.get_value', side_effect=MockDataSizeValue):
332        self.assertEqual('parallel_composite_upload_component_size:10485760,'
333                         'parallel_composite_upload_threshold:10485760,'
334                         'sliced_object_download_component_size:10485760,'
335                         'sliced_object_download_threshold:10485760',
336                         self.collector._ValidateAndGetConfigValues())
337
338  def testCommandAndErrorEventsCollection(self):
339    """Tests the collection of command and error GA events."""
340    self.assertEqual([], self.collector._metrics)
341
342    _LogAllTestMetrics()
343    # Only the first command should be logged.
344    metrics.LogCommandParams(command_name='cmd2')
345
346    # Commands and errors should not be collected until we explicitly collect
347    # them.
348    self.assertEqual([], self.collector._metrics)
349    self.collector._CollectCommandAndErrorMetrics()
350    self.assertEqual(COMMAND_AND_ERROR_TEST_METRICS,
351                     set(self.collector._metrics))
352
353  def testPerformanceSummaryEventCollection(self):
354    """Test the collection of PerformanceSummary GA events."""
355    # PerformanceSummaries are only collected for cp and rsync.
356    self.collector.ga_params[metrics._GA_LABEL_MAP['Command Name']] = 'cp'
357    # GetDiskCounters is called at initialization of _PerformanceSummaryParams,
358    # which occurs during the first call to LogPerformanceSummaryParams.
359    with mock.patch('gslib.metrics.GetDiskCounters',
360                    return_value={'fake-disk': (0, 0, 0, 0, 0, 0)}):
361      metrics.LogPerformanceSummaryParams(
362          uses_fan=True, uses_slice=True, avg_throughput=10,
363          is_daisy_chain=True, has_file_dst=False, has_cloud_dst=True,
364          has_file_src=False, has_cloud_src=True, total_bytes_transferred=100,
365          total_elapsed_time=10, thread_idle_time=40, thread_execution_time=10,
366          num_processes=2, num_threads=3, num_objects_transferred=3,
367          provider_types=['gs'])
368
369    # Log a retryable service error and two retryable network errors.
370    service_retry_msg = RetryableErrorMessage(
371        apitools_exceptions.CommunicationError(), 0)
372    network_retry_msg = RetryableErrorMessage(socket.error(), 0)
373    metrics.LogRetryableError(service_retry_msg)
374    metrics.LogRetryableError(network_retry_msg)
375    metrics.LogRetryableError(network_retry_msg)
376
377    # Log some thread throughput.
378    start_file_msg = FileMessage('src', 'dst', 0, size=100)
379    end_file_msg = FileMessage('src', 'dst', 10, finished=True)
380    start_file_msg.thread_id = end_file_msg.thread_id = 1
381    start_file_msg.process_id = end_file_msg.process_id = 1
382    metrics.LogPerformanceSummaryParams(file_message=start_file_msg)
383    metrics.LogPerformanceSummaryParams(file_message=end_file_msg)
384    self.assertEqual(self.collector.perf_sum_params.thread_throughputs[
385        (1, 1)].GetThroughput(), 10)
386
387    # GetDiskCounters is called a second time during collection.
388    with mock.patch('gslib.metrics.GetDiskCounters',
389                    return_value={'fake-disk': (0, 0, 0, 0, 10, 10)}):
390      self.collector._CollectPerformanceSummaryMetric()
391
392    # Check for all the expected parameters.
393    metric_body = self.collector._metrics[0].body
394    label_and_value_pairs = [
395        ('Event Category', metrics._GA_PERFSUM_CATEGORY),
396        ('Event Action', 'CloudToCloud%2CDaisyChain'), ('Execution Time', '10'),
397        ('Parallelism Strategy', 'both'), ('Source URL Type', 'cloud'),
398        ('Provider Types', 'gs'), ('Num Processes', '2'), ('Num Threads', '3'),
399        ('Number of Files/Objects Transferred', '3'),
400        ('Size of Files/Objects Transferred', '100'),
401        ('Average Overall Throughput', '10'),
402        ('Num Retryable Service Errors', '1'),
403        ('Num Retryable Network Errors', '2'),
404        ('Thread Idle Time Percent', '0.8'),
405        ('Slowest Thread Throughput', '10'),
406        ('Fastest Thread Throughput', '10'),
407    ]
408    if IS_LINUX:  # Disk I/O time is only available on Linux.
409      label_and_value_pairs.append(('Disk I/O Time', '20'))
410    for label, exp_value in label_and_value_pairs:
411      self.assertIn('{0}={1}'.format(metrics._GA_LABEL_MAP[label], exp_value),
412                    metric_body)
413
414  def testCommandCollection(self):
415    """Tests the collection of command parameters."""
416    _TryExceptAndPass(self.command_runner.RunNamedCommand,
417                      'acl', ['set', '-a'], collect_analytics=True)
418    self.assertEqual(
419        'acl set',
420        self.collector.ga_params.get(metrics._GA_LABEL_MAP['Command Name']))
421    self.assertEqual('a', self.collector.ga_params.get(metrics._GA_LABEL_MAP[
422        'Command-Level Options']))
423
424    # Reset the ga_params, which store the command info.
425    self.collector.ga_params.clear()
426
427    self.command_runner.RunNamedCommand('list', collect_analytics=True)
428    self.assertEqual(
429        'ls',
430        self.collector.ga_params.get(metrics._GA_LABEL_MAP['Command Name']))
431    self.assertEqual(
432        'list',
433        self.collector.ga_params.get(metrics._GA_LABEL_MAP['Command Alias']))
434
435    self.collector.ga_params.clear()
436    _TryExceptAndPass(
437        self.command_runner.RunNamedCommand,
438        'iam', ['get', 'dummy_bucket'], collect_analytics=True)
439    self.assertEqual(
440        'iam get',
441        self.collector.ga_params.get(metrics._GA_LABEL_MAP['Command Name']))
442
443  # We only care about the error logging, not the actual exceptions handling.
444  @mock.patch.object(http_wrapper, 'HandleExceptionsAndRebuildHttpConnections')
445  def testRetryableErrorCollection(self, mock_default_retry):
446    """Tests the collection of a retryable error in the retry function."""
447    # A DiscardMessagesQueue has the same retryable error-logging code as the
448    # UIThread and the MainThreadUIQueue.
449    mock_queue = RetryableErrorsQueue()
450    value_error_retry_args = http_wrapper.ExceptionRetryArgs(None, None,
451                                                             ValueError(), None,
452                                                             None, None)
453    socket_error_retry_args = http_wrapper.ExceptionRetryArgs(None, None,
454                                                              socket.error(),
455                                                              None, None, None)
456    metadata_retry_func = LogAndHandleRetries(is_data_transfer=False,
457                                              status_queue=mock_queue)
458    media_retry_func = LogAndHandleRetries(is_data_transfer=True,
459                                           status_queue=mock_queue)
460
461    metadata_retry_func(value_error_retry_args)
462    self.assertEqual(self.collector.retryable_errors['ValueError'], 1)
463    metadata_retry_func(value_error_retry_args)
464    self.assertEqual(self.collector.retryable_errors['ValueError'], 2)
465    metadata_retry_func(socket_error_retry_args)
466    self.assertEqual(self.collector.retryable_errors['SocketError'], 1)
467
468    # The media retry function raises an exception after logging because
469    # the GcsJsonApi handles retryable errors for media transfers itself.
470    _TryExceptAndPass(media_retry_func, value_error_retry_args)
471    _TryExceptAndPass(media_retry_func, socket_error_retry_args)
472    self.assertEqual(self.collector.retryable_errors['ValueError'], 3)
473    self.assertEqual(self.collector.retryable_errors['SocketError'], 2)
474
475  def testExceptionCatchingDecorator(self):
476    """Tests the exception catching decorator CaptureAndLogException."""
477
478    # A wrapped function with an exception should not stop the process.
479    mock_exc_fn = mock.MagicMock(__name__='mock_exc_fn',
480                                 side_effect=Exception())
481    wrapped_fn = metrics.CaptureAndLogException(mock_exc_fn)
482    wrapped_fn()
483
484    debug_messages = self.log_handler.messages['debug']
485    self.assertIn('Exception captured in mock_exc_fn during metrics collection',
486                  debug_messages[0])
487    self.log_handler.reset()
488
489    self.assertEqual(1, mock_exc_fn.call_count)
490
491    mock_err_fn = mock.MagicMock(__name__='mock_err_fn',
492                                 side_effect=TypeError())
493    wrapped_fn = metrics.CaptureAndLogException(mock_err_fn)
494    wrapped_fn()
495    self.assertEqual(1, mock_err_fn.call_count)
496
497    debug_messages = self.log_handler.messages['debug']
498    self.assertIn('Exception captured in mock_err_fn during metrics collection',
499                  debug_messages[0])
500    self.log_handler.reset()
501
502    # Test that exceptions in the unprotected metrics functions are caught.
503    with mock.patch.object(MetricsCollector, 'GetCollector',
504                           return_value='not a collector'):
505      # These calls should all fail, but the exceptions shouldn't propagate up.
506      metrics.Shutdown()
507      metrics.LogCommandParams()
508      metrics.LogRetryableError()
509      metrics.LogFatalError()
510      metrics.LogPerformanceSummaryParams()
511      metrics.CheckAndMaybePromptForAnalyticsEnabling('invalid argument')
512
513      debug_messages = self.log_handler.messages['debug']
514      message_index = 0
515      for func_name in ('Shutdown', 'LogCommandParams', 'LogRetryableError',
516                        'LogFatalError', 'LogPerformanceSummaryParams',
517                        'CheckAndMaybePromptForAnalyticsEnabling'):
518        self.assertIn(
519            'Exception captured in %s during metrics collection' % func_name,
520            debug_messages[message_index])
521        message_index += 1
522
523      self.log_handler.reset()
524
525
526# Mock callback handlers to throw errors in integration tests, based on handlers
527# from test_cp.py.
528class _JSONForceHTTPErrorCopyCallbackHandler(object):
529  """Test callback handler that raises an arbitrary HTTP error exception."""
530
531  def __init__(self, startover_at_byte, http_error_num):
532    self._startover_at_byte = startover_at_byte
533    self._http_error_num = http_error_num
534    self.started_over_once = False
535
536  # pylint: disable=invalid-name
537  def call(self, total_bytes_transferred, unused_total_size):
538    """Forcibly exits if the transfer has passed the halting point."""
539    if (total_bytes_transferred >= self._startover_at_byte and
540        not self.started_over_once):
541      self.started_over_once = True
542      raise apitools_exceptions.HttpError({'status': self._http_error_num},
543                                          None, None)
544
545
546class _ResumableUploadRetryHandler(object):
547  """Test callback handler for causing retries during a resumable transfer."""
548
549  def __init__(self, retry_at_byte, exception_to_raise, exc_args,
550               num_retries=1):
551    self._retry_at_byte = retry_at_byte
552    self._exception_to_raise = exception_to_raise
553    self._exception_args = exc_args
554    self._num_retries = num_retries
555
556    self._retries_made = 0
557
558  # pylint: disable=invalid-name
559  def call(self, total_bytes_transferred, unused_total_size):
560    """Cause a single retry at the retry point."""
561    if (total_bytes_transferred >= self._retry_at_byte and
562        self._retries_made < self._num_retries):
563      self._retries_made += 1
564      raise self._exception_to_raise(*self._exception_args)
565
566
567class TestMetricsIntegrationTests(testcase.GsUtilIntegrationTestCase):
568  """Integration tests for analytics data collection."""
569
570  def setUp(self):
571    super(TestMetricsIntegrationTests, self).setUp()
572
573    # Save the original state of the collector.
574    self.original_collector_instance = MetricsCollector.GetCollector()
575
576    # Set dummy attributes for the collector.
577    MetricsCollector.StartTestCollector('https://example.com', 'user-agent-007',
578                                        {'a': 'b', 'c': 'd'})
579    self.collector = MetricsCollector.GetCollector()
580
581  def tearDown(self):
582    super(TestMetricsIntegrationTests, self).tearDown()
583
584    # Reset to default collection settings.
585    MetricsCollector.StopTestCollector(
586        original_instance=self.original_collector_instance)
587
588  def _RunGsUtilWithAnalyticsOutput(self, cmd, expected_status=0):
589    """Runs the gsutil command to check for metrics log output.
590
591    The env value is set so that the metrics collector in the subprocess will
592    use testing parameters and output the metrics collected to the debugging
593    log, which lets us check for proper collection in the stderr.
594
595    Args:
596      cmd: The command to run, as a list.
597      expected_status: The expected return code.
598
599    Returns:
600      The string of metrics output.
601    """
602    stderr = self.RunGsUtil(['-d'] + cmd, return_stderr=True,
603                            expected_status=expected_status,
604                            env_vars={'GSUTIL_TEST_ANALYTICS': '2'})
605    return METRICS_LOG_RE.search(stderr).group()
606
607  def _StartObjectPatch(self, *args, **kwargs):
608    """Runs mock.patch.object with the given args, and returns the mock object.
609
610    This starts the patcher, returns the mock object, and registers the patcher
611    to stop on test teardown.
612
613    Args:
614      *args: The args to pass to mock.patch.object()
615      **kwargs: The kwargs to pass to mock.patch.object()
616
617    Returns:
618      Mock, The result of starting the patcher.
619    """
620    patcher = mock.patch.object(*args, **kwargs)
621    self.addCleanup(patcher.stop)
622    return patcher.start()
623
624  def _CheckParameterValue(self, param_name, exp_value, metrics_to_search):
625    """Checks for a correct key=value pair in a log output string."""
626    self.assertIn(
627        '{0}={1}'.format(metrics._GA_LABEL_MAP[param_name], exp_value),
628        metrics_to_search)
629
630  @mock.patch('time.time', new=mock.MagicMock(return_value=0))
631  def testMetricsReporting(self):
632    """Tests the subprocess creation by Popen in metrics.py."""
633    popen_mock = self._StartObjectPatch(subprocess, 'Popen')
634
635    # Set up the temp file for pickle dumping metrics into.
636    metrics_file = tempfile.NamedTemporaryFile()
637    metrics_file.close()
638    temp_file_mock = self._StartObjectPatch(tempfile, 'NamedTemporaryFile')
639    temp_file_mock.return_value = open(metrics_file.name, 'wb')
640
641    # If there are no metrics, Popen should not be called.
642    self.collector.ReportMetrics()
643    self.assertEqual(0, popen_mock.call_count)
644
645    _LogAllTestMetrics()
646
647    # Report the metrics and check Popen calls.
648    metrics.Shutdown()
649    call_list = popen_mock.call_args_list
650    self.assertEqual(1, len(call_list))
651    # Check to make sure that we have the proper PYTHONPATH in the subprocess.
652    args = call_list[0]
653    self.assertIn('PYTHONPATH', args[1]['env'])
654    # Ensure that we can access the same modules as the main process from
655    # PYTHONPATH.
656    missing_paths = (
657        set(sys.path) - set(args[1]['env']['PYTHONPATH'].split(os.pathsep)))
658    self.assertEqual(set(), missing_paths)
659
660    # Check that the metrics were correctly dumped into the temp file.
661    with open(metrics_file.name, 'rb') as metrics_file:
662      reported_metrics = pickle.load(metrics_file)
663    self.assertEqual(COMMAND_AND_ERROR_TEST_METRICS,
664                     set(reported_metrics))
665
666  @mock.patch('time.time', new=mock.MagicMock(return_value=0))
667  def testMetricsPosting(self):
668    """Tests the metrics posting process as performed in metrics_reporter.py."""
669    # Clear the log file.
670    open(LOG_FILE_PATH, 'w').close()
671    metrics.LogCommandParams(
672        global_opts=[('-y', 'value'), ('-z', ''), ('-x', '')])
673
674    # Collect a metric and set log level for the metrics_reporter subprocess.
675    def CollectMetricAndSetLogLevel(log_level):
676      metrics.LogCommandParams(command_name='cmd1', subcommands=['action1'],
677                               sub_opts=[('optb', ''), ('opta', '')])
678      metrics.LogFatalError(gslib.exception.CommandException('test'))
679
680      # Wait for report to make sure the log is written before we check it.
681      self.collector.ReportMetrics(wait_for_report=True, log_level=log_level)
682      self.assertEqual([], self.collector._metrics)
683
684    # The log file should be empty unless the debug option is specified.
685    CollectMetricAndSetLogLevel(logging.DEBUG)
686    with open(LOG_FILE_PATH, 'rb') as metrics_log:
687      log_text = metrics_log.read()
688    expected_response = (
689        'Metric(endpoint=\'https://example.com\', method=\'POST\', '
690        'body=\'{0}&cm2=0&ea=cmd1+action1&ec={1}&el={2}&ev=0\', '
691        'user_agent=\'user-agent-007\')'.format(GLOBAL_DIMENSIONS_URL_PARAMS,
692                                                metrics._GA_COMMANDS_CATEGORY,
693                                                VERSION))
694    self.assertIn(expected_response, log_text)
695    self.assertIn('RESPONSE: 200', log_text)
696
697    CollectMetricAndSetLogLevel(logging.INFO)
698    with open(LOG_FILE_PATH, 'rb') as metrics_log:
699      log_text = metrics_log.read()
700    self.assertEqual(log_text, '')
701
702    CollectMetricAndSetLogLevel(logging.WARN)
703    with open(LOG_FILE_PATH, 'rb') as metrics_log:
704      log_text = metrics_log.read()
705    self.assertEqual(log_text, '')
706
707  def testMetricsReportingWithFail(self):
708    """Tests that metrics reporting error does not throw an exception."""
709    popen_mock = self._StartObjectPatch(subprocess, 'Popen')
710    popen_mock.side_effect = OSError()
711
712    self.collector._metrics.append('dummy metric')
713    # Shouldn't raise an exception.
714    self.collector.ReportMetrics()
715
716    self.assertTrue(popen_mock.called)
717
718  def testCommandCollection(self):
719    """Tests the collection of commands."""
720    metrics_list = self._RunGsUtilWithAnalyticsOutput(
721        ['-m', 'acl', 'set', '-a'], expected_status=1)
722    self._CheckParameterValue('Event Category', metrics._GA_COMMANDS_CATEGORY,
723                              metrics_list)
724    self._CheckParameterValue('Event Action', 'acl+set', metrics_list)
725    # Check that the options were collected.
726    self._CheckParameterValue('Global Options', 'd%2Cm', metrics_list)
727    self._CheckParameterValue('Command-Level Options', 'a', metrics_list)
728
729    metrics_list = self._RunGsUtilWithAnalyticsOutput(['ver'])
730    self._CheckParameterValue('Event Category', metrics._GA_COMMANDS_CATEGORY,
731                              metrics_list)
732    self._CheckParameterValue('Event Action', 'version', metrics_list)
733    # Check the recording of the command alias.
734    self._CheckParameterValue('Command Alias', 'ver', metrics_list)
735
736  def testRetryableErrorMetadataCollection(self):
737    """Tests that retryable errors are collected on JSON metadata operations."""
738    # Retryable errors will only be collected with the JSON API.
739    if self.test_api != ApiSelector.JSON:
740      return unittest.skip('Retryable errors are only collected in JSON')
741
742    bucket_uri = self.CreateBucket()
743    object_uri = self.CreateObject(bucket_uri=bucket_uri,
744                                   object_name='foo', contents='bar')
745    # Set the command name to rsync in order to collect PerformanceSummary info.
746    self.collector.ga_params[metrics._GA_LABEL_MAP['Command Name']] = 'rsync'
747    # Generate a JSON API instance to test with, because the RunGsUtil method
748    # may use the XML API.
749    gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
750                            RetryableErrorsQueue(), self.default_provider)
751    # Don't wait for too many retries or for long periods between retries to
752    # avoid long tests.
753    gsutil_api.api_client.num_retries = 2
754    gsutil_api.api_client.max_retry_wait = 1
755
756    # Throw an error when transferring metadata.
757    key = object_uri.get_key()
758    src_obj_metadata = apitools_messages.Object(name=key.name,
759                                                bucket=key.bucket.name,
760                                                contentType=key.content_type)
761    dst_obj_metadata = apitools_messages.Object(
762        bucket=src_obj_metadata.bucket,
763        name=self.MakeTempName('object'),
764        contentType=src_obj_metadata.contentType)
765    with mock.patch.object(http_wrapper, '_MakeRequestNoRetry',
766                           side_effect=socket.error()):
767      _TryExceptAndPass(gsutil_api.CopyObject, src_obj_metadata,
768                        dst_obj_metadata)
769    self.assertEqual(self.collector.retryable_errors['SocketError'], 1)
770
771    # Throw an error when removing a bucket.
772    with mock.patch.object(
773        http_wrapper, '_MakeRequestNoRetry',
774        side_effect=apitools_exceptions.HttpError('unused', 'unused',
775                                                  'unused')):
776      _TryExceptAndPass(gsutil_api.DeleteObject, bucket_uri.bucket_name,
777                        object_uri.object_name)
778    self.assertEqual(self.collector.retryable_errors['HttpError'], 1)
779
780    # Check that the number of each kind of retryable error was logged.
781    self.assertEqual(
782        self.collector.perf_sum_params.num_retryable_network_errors, 1)
783    self.assertEqual(
784        self.collector.perf_sum_params.num_retryable_service_errors, 1)
785
786  def testRetryableErrorMediaCollection(self):
787    """Tests that retryable errors are collected on JSON media operations."""
788    # Retryable errors will only be collected with the JSON API.
789    if self.test_api != ApiSelector.JSON:
790      return unittest.skip('Retryable errors are only collected in JSON')
791
792    boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB))]
793    bucket_uri = self.CreateBucket()
794    # For the resumable upload exception, we need to ensure at least one
795    # callback occurs.
796    halt_size = START_CALLBACK_PER_BYTES * 2
797    fpath = self.CreateTempFile(contents='a' * halt_size)
798
799    # Test that the retry function for data transfers catches and logs an error.
800    test_callback_file = self.CreateTempFile(contents=pickle.dumps(
801        _ResumableUploadRetryHandler(5, apitools_exceptions.BadStatusCodeError,
802                                     ('unused', 'unused', 'unused'))))
803    with SetBotoConfigForTest(boto_config_for_test):
804      metrics_list = self._RunGsUtilWithAnalyticsOutput(
805          ['cp', '--testcallbackfile', test_callback_file,
806           fpath, suri(bucket_uri)])
807      self._CheckParameterValue('Event Category',
808                                metrics._GA_ERRORRETRY_CATEGORY, metrics_list)
809      self._CheckParameterValue('Event Action', 'BadStatusCodeError',
810                                metrics_list)
811      self._CheckParameterValue('Retryable Errors', '1', metrics_list)
812      self._CheckParameterValue('Num Retryable Service Errors', '1',
813                                metrics_list)
814
815    # Test that the ResumableUploadStartOverException in copy_helper is caught.
816    test_callback_file = self.CreateTempFile(
817        contents=pickle.dumps(_JSONForceHTTPErrorCopyCallbackHandler(5, 404)))
818    with SetBotoConfigForTest(boto_config_for_test):
819      metrics_list = self._RunGsUtilWithAnalyticsOutput(
820          ['cp', '--testcallbackfile', test_callback_file,
821           fpath, suri(bucket_uri)])
822      self._CheckParameterValue(
823          'Event Category', metrics._GA_ERRORRETRY_CATEGORY, metrics_list)
824      self._CheckParameterValue(
825          'Event Action', 'ResumableUploadStartOverException', metrics_list)
826      self._CheckParameterValue('Retryable Errors', '1', metrics_list)
827      self._CheckParameterValue(
828          'Num Retryable Service Errors', '1', metrics_list)
829
830    # Test retryable error collection in a multithread/multiprocess situation.
831    test_callback_file = self.CreateTempFile(
832        contents=pickle.dumps(_JSONForceHTTPErrorCopyCallbackHandler(5, 404)))
833    with SetBotoConfigForTest(boto_config_for_test):
834      metrics_list = self._RunGsUtilWithAnalyticsOutput(
835          ['-m', 'cp', '--testcallbackfile',
836           test_callback_file, fpath, suri(bucket_uri)])
837      self._CheckParameterValue('Event Category',
838                                metrics._GA_ERRORRETRY_CATEGORY, metrics_list)
839      self._CheckParameterValue(
840          'Event Action', 'ResumableUploadStartOverException', metrics_list)
841      self._CheckParameterValue('Retryable Errors', '1', metrics_list)
842      self._CheckParameterValue(
843          'Num Retryable Service Errors', '1', metrics_list)
844
845  def testFatalErrorCollection(self):
846    """Tests that fatal errors are collected."""
847    def CheckForCommandException(log_output):
848      self._CheckParameterValue('Event Category',
849                                metrics._GA_ERRORFATAL_CATEGORY, log_output)
850      self._CheckParameterValue('Event Action', 'CommandException', log_output)
851
852    metrics_list = self._RunGsUtilWithAnalyticsOutput(
853        ['invalid-command'], expected_status=1)
854    CheckForCommandException(metrics_list)
855
856    metrics_list = self._RunGsUtilWithAnalyticsOutput(
857        ['mb', '-invalid-option'], expected_status=1)
858    CheckForCommandException(metrics_list)
859
860    bucket_uri = self.CreateBucket()
861    metrics_list = self._RunGsUtilWithAnalyticsOutput(
862        ['cp', suri(bucket_uri), suri(bucket_uri)], expected_status=1)
863    CheckForCommandException(metrics_list)
864
865  def _GetAndCheckAllNumberMetrics(self, metrics_to_search, multithread=True):
866    """Checks number metrics for PerformanceSummary tests.
867
868    Args:
869      metrics_to_search: The string of metrics to search.
870      multithread: False if the the metrics were collected in a non-multithread
871                   situation.
872
873    Returns:
874      (slowest_throughput, fastest_throughput, io_time) as floats.
875    """
876    def _ExtractNumberMetric(param_name):
877      extracted_match = re.search(
878          metrics._GA_LABEL_MAP[param_name] + r'=(\d+\.?\d*)&',
879          metrics_to_search)
880      if not extracted_match:
881        self.fail('Could not find %s (%s) in metrics string %s' %
882                  (metrics._GA_LABEL_MAP[param_name], param_name,
883                   metrics_to_search))
884      return float(extracted_match.group(1))
885
886    # Thread idle time will only be recorded in a multithread situation.
887    if multithread:
888      thread_idle_time = _ExtractNumberMetric('Thread Idle Time Percent')
889      # This should be a decimal between 0 and 1.
890      self.assertGreaterEqual(thread_idle_time, 0)
891      self.assertLessEqual(thread_idle_time, 1)
892
893    throughput = _ExtractNumberMetric('Average Overall Throughput')
894    self.assertGreater(throughput, 0)
895
896    slowest_throughput = _ExtractNumberMetric('Slowest Thread Throughput')
897    fastest_throughput = _ExtractNumberMetric('Fastest Thread Throughput')
898    self.assertGreaterEqual(fastest_throughput, slowest_throughput)
899    self.assertGreater(slowest_throughput, 0)
900    self.assertGreater(fastest_throughput, 0)
901
902    io_time = None
903    if IS_LINUX:
904      io_time = _ExtractNumberMetric('Disk I/O Time')
905      self.assertGreaterEqual(io_time, 0)
906
907    # Return some metrics to run tests in more specific scenarios.
908    return (slowest_throughput, fastest_throughput, io_time)
909
910  def testPerformanceSummaryFileToFile(self):
911    """Tests PerformanceSummary collection in a file-to-file transfer."""
912    tmpdir1 = self.CreateTempDir()
913    tmpdir2 = self.CreateTempDir()
914    file_size = ONE_MIB
915    self.CreateTempFile(tmpdir=tmpdir1, contents='a' * file_size)
916
917    # Run an rsync file-to-file command with fan parallelism, without slice
918    # parallelism.
919    process_count = 1 if IS_WINDOWS else 6
920    with SetBotoConfigForTest([
921        ('GSUtil', 'parallel_process_count', str(process_count)),
922        ('GSUtil', 'parallel_thread_count', '7')]):
923      metrics_list = self._RunGsUtilWithAnalyticsOutput(
924          ['-m', 'rsync', tmpdir1, tmpdir2])
925      self._CheckParameterValue('Event Category', metrics._GA_PERFSUM_CATEGORY,
926                                metrics_list)
927      self._CheckParameterValue('Event Action', 'FileToFile', metrics_list)
928      self._CheckParameterValue('Parallelism Strategy', 'fan', metrics_list)
929      self._CheckParameterValue('Source URL Type', 'file', metrics_list)
930      self._CheckParameterValue('Num Processes', str(process_count),
931                                metrics_list)
932      self._CheckParameterValue('Num Threads', '7', metrics_list)
933      self._CheckParameterValue('Provider Types', 'file', metrics_list)
934      self._CheckParameterValue('Size of Files/Objects Transferred', file_size,
935                                metrics_list)
936      self._CheckParameterValue('Number of Files/Objects Transferred', 1,
937                                metrics_list)
938
939      (_, _, io_time) = self._GetAndCheckAllNumberMetrics(metrics_list)
940      if IS_LINUX:  # io_time will be None on other platforms.
941        # We can't guarantee that the file read/write will consume a
942        # reportable amount of disk I/O, but it should be reported as >= 0.
943        self.assertGreaterEqual(io_time, 0)
944
945  @SkipForS3('No slice parallelism support for S3.')
946  def testPerformanceSummaryFileToCloud(self):
947    """Tests PerformanceSummary collection in a file-to-cloud transfer."""
948    bucket_uri = self.CreateBucket()
949    tmpdir = self.CreateTempDir()
950    file_size = 6
951    self.CreateTempFile(tmpdir=tmpdir, contents='a' * file_size)
952    self.CreateTempFile(tmpdir=tmpdir, contents='b' * file_size)
953
954    process_count = 1 if IS_WINDOWS else 2
955    # Run a parallel composite upload without fan parallelism.
956    with SetBotoConfigForTest([
957        ('GSUtil', 'parallel_process_count', str(process_count)),
958        ('GSUtil', 'parallel_thread_count', '3'),
959        ('GSUtil', 'parallel_composite_upload_threshold', '1')]):
960      metrics_list = self._RunGsUtilWithAnalyticsOutput(
961          ['rsync', tmpdir, suri(bucket_uri)])
962      self._CheckParameterValue('Event Category', metrics._GA_PERFSUM_CATEGORY,
963                                metrics_list)
964      self._CheckParameterValue('Event Action', 'FileToCloud', metrics_list)
965      self._CheckParameterValue('Parallelism Strategy', 'slice', metrics_list)
966      self._CheckParameterValue('Num Processes', str(process_count),
967                                metrics_list)
968      self._CheckParameterValue('Num Threads', '3', metrics_list)
969      self._CheckParameterValue('Provider Types', 'file%2C' + bucket_uri.scheme,
970                                metrics_list)
971      self._CheckParameterValue('Size of Files/Objects Transferred',
972                                2 * file_size, metrics_list)
973      self._CheckParameterValue('Number of Files/Objects Transferred', 2,
974                                metrics_list)
975      (_, _, io_time) = self._GetAndCheckAllNumberMetrics(metrics_list)
976      if IS_LINUX:  # io_time will be None on other platforms.
977        # We can't guarantee that the file read will consume a
978        # reportable amount of disk I/O, but it should be reported as >= 0.
979        self.assertGreaterEqual(io_time, 0)
980
981  @SkipForS3('No slice parallelism support for S3.')
982  def testPerformanceSummaryCloudToFile(self):
983    """Tests PerformanceSummary collection in a cloud-to-file transfer."""
984    bucket_uri = self.CreateBucket()
985    file_size = 6
986    object_uri = self.CreateObject(
987        bucket_uri=bucket_uri, contents='a' * file_size)
988
989    fpath = self.CreateTempFile()
990    # Run a sliced object download with fan parallelism.
991    process_count = 1 if IS_WINDOWS else 4
992    with SetBotoConfigForTest([
993        ('GSUtil', 'parallel_process_count', str(process_count)),
994        ('GSUtil', 'parallel_thread_count', '5'),
995        ('GSUtil', 'sliced_object_download_threshold', '1'),
996        ('GSUtil', 'test_assume_fast_crcmod', 'True')]):
997      metrics_list = self._RunGsUtilWithAnalyticsOutput(
998          ['-m', 'cp', suri(object_uri), fpath])
999      self._CheckParameterValue('Event Category', metrics._GA_PERFSUM_CATEGORY,
1000                                metrics_list)
1001      self._CheckParameterValue('Event Action', 'CloudToFile', metrics_list)
1002      self._CheckParameterValue('Parallelism Strategy', 'both', metrics_list)
1003      self._CheckParameterValue('Num Processes', str(process_count),
1004                                metrics_list)
1005      self._CheckParameterValue('Num Threads', '5', metrics_list)
1006      self._CheckParameterValue('Provider Types', 'file%2C' + bucket_uri.scheme,
1007                                metrics_list)
1008      self._CheckParameterValue('Number of Files/Objects Transferred', '1',
1009                                metrics_list)
1010      self._CheckParameterValue('Size of Files/Objects Transferred', file_size,
1011                                metrics_list)
1012      (_, _, io_time) = self._GetAndCheckAllNumberMetrics(metrics_list)
1013      if IS_LINUX:  # io_time will be None on other platforms.
1014        # We can't guarantee that the file write will consume a
1015        # reportable amount of disk I/O, but it should be reported as >= 0.
1016        self.assertGreaterEqual(io_time, 0)
1017
1018  def testPerformanceSummaryCloudToCloud(self):
1019    """Tests PerformanceSummary collection in a cloud-to-cloud transfer."""
1020    bucket1_uri = self.CreateBucket()
1021    bucket2_uri = self.CreateBucket()
1022    file_size = 6
1023    key_uri = self.CreateObject(
1024        bucket_uri=bucket1_uri, contents='a' * file_size)
1025
1026    # Run a daisy-chain cloud-to-cloud copy without parallelism.
1027    metrics_list = self._RunGsUtilWithAnalyticsOutput(
1028        ['cp', '-D', suri(key_uri), suri(bucket2_uri)])
1029
1030    (slowest_throughput, fastest_throughput,
1031     _) = self._GetAndCheckAllNumberMetrics(metrics_list, multithread=False)
1032    # Since there's a single thread, this must be the case.
1033    self.assertEqual(slowest_throughput, fastest_throughput)
1034
1035    self._CheckParameterValue('Event Category', metrics._GA_PERFSUM_CATEGORY,
1036                              metrics_list)
1037    self._CheckParameterValue('Event Action', 'CloudToCloud%2CDaisyChain',
1038                              metrics_list)
1039    self._CheckParameterValue('Parallelism Strategy', 'none', metrics_list)
1040    self._CheckParameterValue('Source URL Type', 'cloud', metrics_list)
1041    self._CheckParameterValue('Num Processes', '1', metrics_list)
1042    self._CheckParameterValue('Num Threads', '1', metrics_list)
1043    self._CheckParameterValue('Provider Types', bucket1_uri.scheme,
1044                              metrics_list)
1045    self._CheckParameterValue('Number of Files/Objects Transferred', '1',
1046                              metrics_list)
1047    self._CheckParameterValue('Size of Files/Objects Transferred', file_size,
1048                              metrics_list)
1049
1050  @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials')
1051  def testCrossProviderDaisyChainCollection(self):
1052    """Tests the collection of daisy-chain operations."""
1053    s3_bucket = self.CreateBucket(provider='s3')
1054    gs_bucket = self.CreateBucket(provider='gs')
1055    unused_s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='foo')
1056    gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='bar')
1057
1058    metrics_list = self._RunGsUtilWithAnalyticsOutput(
1059        ['rsync', suri(s3_bucket), suri(gs_bucket)])
1060    self._CheckParameterValue('Event Action', 'CloudToCloud%2CDaisyChain',
1061                              metrics_list)
1062    self._CheckParameterValue('Provider Types', 'gs%2Cs3', metrics_list)
1063
1064    metrics_list = self._RunGsUtilWithAnalyticsOutput(
1065        ['cp', suri(gs_key), suri(s3_bucket)])
1066    self._CheckParameterValue('Event Action', 'CloudToCloud%2CDaisyChain',
1067                              metrics_list)
1068    self._CheckParameterValue('Provider Types', 'gs%2Cs3', metrics_list)
1069