1# Copyright 2019 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import argparse
6import filecmp
7import json
8import logging
9from logging import handlers
10import os
11import posixpath
12import shutil
13import subprocess
14
15from core.external_modules import pandas as pd
16from core.external_modules import numpy as np
17from core import gsutil
18from py_utils import tempfile_ext
19
20
21PINBOARD_DIR = os.path.abspath(os.path.dirname(__file__))
22TOOLS_PERF_DIR = os.path.normpath(os.path.join(PINBOARD_DIR, '..', '..'))
23CACHED_DATA_DIR = os.path.join(TOOLS_PERF_DIR, '_cached_data', 'pinboard')
24
25PINPOINT_CLI = os.path.join(TOOLS_PERF_DIR, 'pinpoint_cli')
26JOB_CONFIGS_PATH = os.path.join(PINBOARD_DIR, 'job_configs.json')
27
28JOBS_STATE_FILE = 'jobs_state.json'
29DATASET_PKL_FILE = 'dataset.pkl'
30DATASET_CSV_FILE = 'dataset.csv'
31
32CLOUD_STORAGE_DIR = 'gs://chrome-health-tvdata/pinboard'
33TZ = 'America/Los_Angeles'  # MTV-time.
34
35
36# Only these are exported and uploaded to the Cloud Storage dataset.
37MEASUREMENTS = set([
38    # V8 metrics.
39    'JavaScript:duration',
40    'Optimize-Background:duration',
41    'Optimize:duration',
42    'RunsPerMinute',
43    'Total-Main-Thread:duration',
44    'Total:duration',
45    'V8-Only-Main-Thread:duration',
46    'V8-Only:duration',
47    'memory:chrome:renderer_processes:reported_by_chrome:v8:effective_size',
48    'total:500ms_window:renderer_eqt:v8',
49
50    # Startup metrics.
51    'experimental_content_start_time',
52    'experimental_navigation_start_time',
53    'first_contentful_paint_time',
54    'messageloop_start_time',
55    'navigation_commit_time',
56])
57
58# Compute averages over a fixed set of active stories. These may need to be
59# periodically updated.
60ACTIVE_STORIES = set([
61    # v8.browsing_mobile.
62    'browse:chrome:newtab:2019',
63    'browse:chrome:omnibox:2019',
64    'browse:media:facebook_photos:2019',
65    'browse:media:flickr_infinite_scroll:2019',
66    'browse:media:googleplaystore:2019',
67    'browse:media:imgur:2019',
68    'browse:media:youtube:2019',
69    'browse:news:cricbuzz:2019',
70    'browse:news:globo:2019',
71    'browse:news:nytimes:2019',
72    'browse:news:qq:2019',
73    'browse:news:reddit:2019',
74    'browse:news:toi:2019',
75    'browse:shopping:amazon:2019',
76    'browse:news:washingtonpost:2019',
77    'browse:search:amp:sxg:2019',
78    'browse:shopping:amazon:2019',
79    'browse:shopping:avito:2019',
80    'browse:shopping:flipkart:2019',
81    'browse:shopping:lazada:2019',
82    'browse:social:facebook:2019',
83    'browse:social:instagram:2019',
84    'browse:social:twitter:2019',
85    'browse:tools:maps:2019',
86
87    # v8.browsing_desktop.
88    'browse:news:nytimes:2020',
89    'browse:news:flipboard:2018',
90    'browse:social:facebook_infinite_scroll:2018',
91    'browse:tools:sheets:2019',
92    'browse:media:tumblr:2018',
93    'browse:tools:maps:2019',
94    'browse:social:twitter_infinite_scroll:2018',
95    'browse:tech:discourse_infinite_scroll:2018',
96    'browse:social:twitter:2018',
97    'browse:social:tumblr_infinite_scroll:2018',
98    'browse:media:googleplaystore:2018',
99    'browse:search:google:2018',
100    'browse:news:cnn:2018',
101    'browse:news:reddit:2018',
102    'browse:search:google_india:2018',
103    'browse:media:youtubetv:2019',
104
105    # Speedometer2.
106    'Speedometer2',
107])
108
109
110def StartPinpointJobs(state, date):
111  """Start new pinpoint jobs for the last commit on the given date."""
112  revision, timestamp = GetLastCommitOfDate(date)
113  if any(item['revision'] == revision for item in state):
114    logging.info('No new jobs to start.')
115    return
116
117  # Add a new item to the state with info about jobs for this revision.
118  logging.info('Starting jobs for %s (%s):', timestamp[:10], revision)
119  item = {'revision': revision, 'timestamp': timestamp, 'jobs': []}
120  configs = LoadJsonFile(JOB_CONFIGS_PATH)
121  for config in configs:
122    config['base_git_hash'] = revision
123    with tempfile_ext.NamedTemporaryFile() as tmp:
124      json.dump(config, tmp)
125      tmp.close()
126      output = subprocess.check_output(
127          ['vpython', PINPOINT_CLI, 'start-job', tmp.name],
128          universal_newlines=True).strip()
129    logging.info(output)
130    assert 'https://pinpoint' in output
131    bot = config['configuration']
132    item['jobs'].append({
133        'id': output.split('/')[-1],
134        'status': 'queued',
135        'bot': bot
136    })
137  state.append(item)
138  state.sort(key=lambda p: p['timestamp'])  # Keep items sorted by date.
139
140
141def IsJobFinished(job):
142  return job['status'] in ['completed', 'failed']
143
144
145def CollectPinpointResults(state):
146  """Check the status of pinpoint jobs and collect their results."""
147  # First iterate over all running jobs, and update their status.
148  for item in state:
149    active = [job['id'] for job in item['jobs'] if not IsJobFinished(job)]
150    if not active:
151      continue
152    cmd = ['vpython', PINPOINT_CLI, 'status']
153    cmd.extend(active)
154    output = subprocess.check_output(cmd, universal_newlines=True)
155    updates = dict(line.split(': ', 1) for line in output.splitlines())
156    logging.info('Got job updates: %s.', updates)
157    for job in item['jobs']:
158      if job['id'] in updates:
159        job['status'] = updates[job['id']]
160
161  # Now iterate over all completed jobs, and download their results if needed.
162  for item in state:
163    if _SkipProcessing(item):  # Skip if not ready or all failed.
164      continue
165    output_file = RevisionResultsFile(item)
166    if not os.path.exists(output_file):
167      cmd = ['vpython', PINPOINT_CLI, 'get-csv', '--output', output_file, '--']
168      job_ids = [j['id'] for j in item['jobs'] if j['status'] == 'completed']
169      logging.info('Getting csv data for commit: %s.', item['revision'])
170      subprocess.check_output(cmd + job_ids)
171
172
173def LoadJobsState():
174  """Load the latest recorded state of pinpoint jobs."""
175  local_path = CachedFilePath(JOBS_STATE_FILE)
176  if os.path.exists(local_path) or DownloadFromCloudStorage(local_path):
177    return LoadJsonFile(local_path)
178  else:
179    logging.info('No jobs state found. Creating empty state.')
180    return []
181
182
183def UpdateJobsState(state):
184  """Write back the updated state of pinpoint jobs.
185
186  If there were any changes to the state, i.e. new jobs were created or
187  existing ones completed, both the local cached copy and the backup in cloud
188  storage are updated.
189  """
190  local_path = CachedFilePath(JOBS_STATE_FILE)
191  with tempfile_ext.NamedTemporaryFile() as tmp:
192    json.dump(state, tmp, sort_keys=True, indent=2, separators=(',', ': '))
193    tmp.close()
194    if not os.path.exists(local_path) or not filecmp.cmp(tmp.name, local_path):
195      shutil.copyfile(tmp.name, local_path)
196      UploadToCloudStorage(local_path)
197
198
199def GetCachedDataset():
200  """Load the latest dataset with cached data."""
201  local_path = CachedFilePath(DATASET_PKL_FILE)
202  if os.path.exists(local_path) or DownloadFromCloudStorage(local_path):
203    return pd.read_pickle(local_path)
204  else:
205    return None
206
207
208def UpdateCachedDataset(df):
209  """Write back the dataset with cached data."""
210  local_path = CachedFilePath(DATASET_PKL_FILE)
211  df.to_pickle(local_path)
212  UploadToCloudStorage(local_path)
213
214
215def GetItemsToUpdate(state):
216  """Select jobs with new data to download and cached data for existing jobs.
217
218  This also filters out old revisions to keep only recent (6 months) data.
219
220  Returns:
221    new_items: A list of job items from which to get data.
222    cached_df: A DataFrame with existing cached data, may be None.
223  """
224  from_date = str(TimeAgo(months=6).date())
225  new_items = [item for item in state if item['timestamp'] > from_date]
226  df = GetCachedDataset()
227  if df is not None:
228    recent_revisions = set(item['revision'] for item in new_items)
229    df = df[df['revision'].isin(recent_revisions)]
230    known_revisions = set(df['revision'])
231    new_items = [
232        item for item in new_items if item['revision'] not in known_revisions]
233  return new_items, df
234
235
236def AggregateAndUploadResults(new_items, cached_df=None):
237  """Aggregate results collected and upload them to cloud storage."""
238  dfs = []
239  if cached_df is not None:
240    dfs.append(cached_df)
241
242  found_new = False
243  for item in new_items:
244    if _SkipProcessing(item):  # Jobs are not ready, or all have failed.
245      continue
246    if not found_new:
247      logging.info('Processing data from new results:')
248      found_new = True
249    logging.info('- %s (%s)', item['timestamp'][:10], item['revision'])
250    dfs.append(GetRevisionResults(item))
251
252  if not found_new:
253    logging.info('No new data found.')
254    return
255
256  # Otherwise update our cache and upload.
257  df = pd.concat(dfs, ignore_index=True)
258  UpdateCachedDataset(df)
259
260  # Drop revisions with no results and mark the last result for each metric,
261  # both with/without patch, as a 'reference'. This allows making score cards
262  # comparing their most recent results in Data Studio dashboards.
263  df = df[df['count'] > 0].copy()
264  latest_result = df.groupby(
265      ['label', 'benchmark', 'name'])['timestamp'].transform('max')
266  df['reference'] = df['timestamp'] == latest_result
267
268  dataset_file = CachedFilePath(DATASET_CSV_FILE)
269  df.to_csv(dataset_file, index=False)
270  UploadToCloudStorage(dataset_file)
271  logging.info('Total %s rows of data uploaded.' % len(df.index))
272
273
274def GetRevisionResults(item):
275  """Aggregate the results from jobs that ran on a particular revision."""
276  # First load pinpoint csv results into a DataFrame. The dtype arg is needed
277  # to ensure that job_id's are always read a strings (even if some of them
278  # look like large numbers).
279  df = pd.read_csv(RevisionResultsFile(item), dtype={'job_id': str})
280  assert df['change'].str.contains(item['revision']).all(), (
281      'Not all results match the expected git revision')
282
283  # Filter out and keep only the measurements and stories that we want.
284  df = df[df['name'].isin(MEASUREMENTS)]
285  df = df[df['story'].isin(ACTIVE_STORIES)]
286
287  if not df.empty:
288    # Aggregate over the results of individual stories.
289    df = df.groupby(['change', 'job_id', 'name', 'benchmark',
290                     'unit'])['mean'].agg(['mean', 'count']).reset_index()
291  else:
292    # Otherwise build a single row with an "empty" aggregate for this revision.
293    # This is needed so we can remember in the cache that this revision has
294    # been processed.
295    df = pd.DataFrame(index=[0])
296    df['change'] = item['revision']
297    df['job_id'] = '(missing)'
298    df['name'] = '(missing)'
299    df['benchmark'] = '(missing)'
300    df['unit'] = ''
301    df['mean'] = np.nan
302    df['count'] = 0
303
304  # Convert time units from milliseconds to seconds. This is what Data Studio
305  # dashboards expect.
306  is_ms_unit = df['unit'].str.startswith('ms_')
307  df.loc[is_ms_unit, 'mean'] = df['mean'] / 1000
308
309  # Distinguish jobs that ran with/without the tested patch.
310  df['label'] = df['change'].str.contains(r'\+').map(
311      {False: 'without_patch', True: 'with_patch'})
312
313  # Add timestamp and revision information. We snap the date to noon and make
314  # it naive (i.e. no timezone), so the dashboard doesn't get confused with
315  # dates close to the end of day.
316  date = item['timestamp'].split('T')[0] + 'T12:00:00'
317  df['timestamp'] = pd.Timestamp(date)
318  df['revision'] = item['revision']
319
320  # Fake the timestamp of jobs without the patch to appear as if they ran a
321  # year ago; this makes it easier to visualize and compare timeseries from
322  # runs with/without the patch in Data Studio dashboards.
323  df.loc[df['label'] == 'without_patch', 'timestamp'] = (
324      df['timestamp'] - pd.DateOffset(years=1))
325
326  df['bot'] = 'unknown'
327  for j in item['jobs']:
328    bot = j.get('bot', 'unknown')
329    df.loc[df['job_id'].str.contains(str(j['id'])), 'bot'] = bot
330
331  return df[[
332      'revision', 'timestamp', 'bot', 'label', 'benchmark', 'name', 'mean',
333      'count'
334  ]]
335
336
337def _SkipProcessing(item):
338  """Return True if not all jobs have finished or all have failed."""
339  return (not all(IsJobFinished(job) for job in item['jobs']) or
340          all(job['status'] == 'failed' for job in item['jobs']))
341
342
343def GetLastCommitOfDate(date):
344  """"Find the the lastest commit that landed on a given date."""
345  # Make sure our local git repo has up to date info on origin/master.
346  logging.info('Fetching latest origin/master data.')
347  subprocess.check_output(
348      ['git', 'fetch', 'origin', 'master'], cwd=TOOLS_PERF_DIR,
349      stderr=subprocess.STDOUT)
350
351  # Snap the date to the end of the day.
352  cutoff_date = date.replace(hour=12).ceil('D')
353  logging.info('Finding latest commit before %s.', cutoff_date)
354  if not FindCommit(after_date=cutoff_date):
355    # We expect there to be some commits after the 'cutoff_date', otherwise
356    # there isn't yet a *last* commit before that date.
357    raise ValueError("Given date appears to be in the future. There isn't yet "
358                     'a last commit before %s.' % cutoff_date)
359  return FindCommit(before_date=cutoff_date)
360
361
362def FindCommit(before_date=None, after_date=None):
363  """Find latest commit with optional before/after date constraints."""
364  cmd = ['git', 'log', '--max-count', '1', '--format=format:%H:%ct']
365  if before_date is not None:
366    cmd.extend(['--before', before_date.isoformat()])
367  if after_date is not None:
368    cmd.extend(['--after', after_date.isoformat()])
369  cmd.append('origin/master')
370  line = subprocess.check_output(cmd, cwd=TOOLS_PERF_DIR).strip()
371  if line:
372    revision, commit_time = line.split(':')
373    commit_time = pd.Timestamp(
374        int(commit_time), unit='s', tz=TZ).isoformat()
375    return revision, commit_time
376  else:
377    return None
378
379
380def RevisionResultsFile(item):
381  """Get a filepath where to cache results of jobs for a single revision."""
382  return CachedFilePath('job_results', item['revision'] + '.csv')
383
384
385def CachedFilePath(arg, *args):
386  """Get the path to a file stored in local cache."""
387  return os.path.join(CACHED_DATA_DIR, arg, *args)
388
389
390def UploadToCloudStorage(filepath):
391  """Copy the given file to cloud storage."""
392  gsutil.Copy(
393      filepath, posixpath.join(CLOUD_STORAGE_DIR, os.path.basename(filepath)))
394
395
396def DownloadFromCloudStorage(filepath):
397  """Get the given file from cloud storage."""
398  try:
399    gsutil.Copy(
400        posixpath.join(CLOUD_STORAGE_DIR, os.path.basename(filepath)), filepath)
401    logging.info('Downloaded copy of %s from cloud storage.', filepath)
402    return True
403  except subprocess.CalledProcessError:
404    logging.info('Failed to download copy of %s from cloud storage.', filepath)
405    return False
406
407
408def LoadJsonFile(filename):
409  with open(filename) as f:
410    return json.load(f)
411
412
413def TimeAgo(**kwargs):
414  return pd.Timestamp.now(TZ) - pd.DateOffset(**kwargs)
415
416
417def SetUpLogging(level):
418  """Set up logging to log both to stderr and a file."""
419  logger = logging.getLogger()
420  logger.setLevel(level)
421  formatter = logging.Formatter(
422      '(%(levelname)s) %(asctime)s [%(module)s] %(message)s')
423
424  h1 = logging.StreamHandler()
425  h1.setFormatter(formatter)
426  logger.addHandler(h1)
427
428  h2 = handlers.TimedRotatingFileHandler(
429     filename=CachedFilePath('pinboard.log'), when='W0', backupCount=5)
430  h2.setFormatter(formatter)
431  logger.addHandler(h2)
432
433
434def Main():
435  SetUpLogging(level=logging.INFO)
436  actions = ('start', 'collect', 'upload')
437  parser = argparse.ArgumentParser()
438  parser.add_argument(
439      'actions', metavar='ACTION', nargs='+', choices=actions + ('auto',),
440      help=("select action to perform: 'start' pinpoint jobs, 'collect' job "
441            "results, 'upload' aggregated data, or 'auto' to do all in "
442            "sequence."))
443  parser.add_argument(
444      '--date', type=lambda s: pd.Timestamp(s, tz=TZ), default=TimeAgo(days=1),
445      help=('Run jobs for the last commit landed on the given date (assuming '
446            'MTV time). Defaults to the last commit landed yesterday.'))
447  args = parser.parse_args()
448  if 'auto' in args.actions:
449    logging.info('=== auto run for %s ===', args.date)
450    args.actions = actions
451
452  cached_results_dir = CachedFilePath('job_results')
453  if not os.path.isdir(cached_results_dir):
454    os.makedirs(cached_results_dir)
455
456  state = LoadJobsState()
457  try:
458    if 'start' in args.actions:
459      StartPinpointJobs(state, args.date)
460    new_items, cached_df = GetItemsToUpdate(state)
461    if 'collect' in args.actions:
462      CollectPinpointResults(new_items)
463  finally:
464    UpdateJobsState(state)
465
466  if 'upload' in args.actions:
467    AggregateAndUploadResults(new_items, cached_df)
468