1# Copyright 2019 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import argparse 6import filecmp 7import json 8import logging 9from logging import handlers 10import os 11import posixpath 12import shutil 13import subprocess 14 15from core.external_modules import pandas as pd 16from core.external_modules import numpy as np 17from core import gsutil 18from py_utils import tempfile_ext 19 20 21PINBOARD_DIR = os.path.abspath(os.path.dirname(__file__)) 22TOOLS_PERF_DIR = os.path.normpath(os.path.join(PINBOARD_DIR, '..', '..')) 23CACHED_DATA_DIR = os.path.join(TOOLS_PERF_DIR, '_cached_data', 'pinboard') 24 25PINPOINT_CLI = os.path.join(TOOLS_PERF_DIR, 'pinpoint_cli') 26JOB_CONFIGS_PATH = os.path.join(PINBOARD_DIR, 'job_configs.json') 27 28JOBS_STATE_FILE = 'jobs_state.json' 29DATASET_PKL_FILE = 'dataset.pkl' 30DATASET_CSV_FILE = 'dataset.csv' 31 32CLOUD_STORAGE_DIR = 'gs://chrome-health-tvdata/pinboard' 33TZ = 'America/Los_Angeles' # MTV-time. 34 35 36# Only these are exported and uploaded to the Cloud Storage dataset. 37MEASUREMENTS = set([ 38 # V8 metrics. 39 'JavaScript:duration', 40 'Optimize-Background:duration', 41 'Optimize:duration', 42 'RunsPerMinute', 43 'Total-Main-Thread:duration', 44 'Total:duration', 45 'V8-Only-Main-Thread:duration', 46 'V8-Only:duration', 47 'memory:chrome:renderer_processes:reported_by_chrome:v8:effective_size', 48 'total:500ms_window:renderer_eqt:v8', 49 50 # Startup metrics. 51 'experimental_content_start_time', 52 'experimental_navigation_start_time', 53 'first_contentful_paint_time', 54 'messageloop_start_time', 55 'navigation_commit_time', 56]) 57 58# Compute averages over a fixed set of active stories. These may need to be 59# periodically updated. 60ACTIVE_STORIES = set([ 61 # v8.browsing_mobile. 62 'browse:chrome:newtab:2019', 63 'browse:chrome:omnibox:2019', 64 'browse:media:facebook_photos:2019', 65 'browse:media:flickr_infinite_scroll:2019', 66 'browse:media:googleplaystore:2019', 67 'browse:media:imgur:2019', 68 'browse:media:youtube:2019', 69 'browse:news:cricbuzz:2019', 70 'browse:news:globo:2019', 71 'browse:news:nytimes:2019', 72 'browse:news:qq:2019', 73 'browse:news:reddit:2019', 74 'browse:news:toi:2019', 75 'browse:shopping:amazon:2019', 76 'browse:news:washingtonpost:2019', 77 'browse:search:amp:sxg:2019', 78 'browse:shopping:amazon:2019', 79 'browse:shopping:avito:2019', 80 'browse:shopping:flipkart:2019', 81 'browse:shopping:lazada:2019', 82 'browse:social:facebook:2019', 83 'browse:social:instagram:2019', 84 'browse:social:twitter:2019', 85 'browse:tools:maps:2019', 86 87 # v8.browsing_desktop. 88 'browse:news:nytimes:2020', 89 'browse:news:flipboard:2018', 90 'browse:social:facebook_infinite_scroll:2018', 91 'browse:tools:sheets:2019', 92 'browse:media:tumblr:2018', 93 'browse:tools:maps:2019', 94 'browse:social:twitter_infinite_scroll:2018', 95 'browse:tech:discourse_infinite_scroll:2018', 96 'browse:social:twitter:2018', 97 'browse:social:tumblr_infinite_scroll:2018', 98 'browse:media:googleplaystore:2018', 99 'browse:search:google:2018', 100 'browse:news:cnn:2018', 101 'browse:news:reddit:2018', 102 'browse:search:google_india:2018', 103 'browse:media:youtubetv:2019', 104 105 # Speedometer2. 106 'Speedometer2', 107]) 108 109 110def StartPinpointJobs(state, date): 111 """Start new pinpoint jobs for the last commit on the given date.""" 112 revision, timestamp = GetLastCommitOfDate(date) 113 if any(item['revision'] == revision for item in state): 114 logging.info('No new jobs to start.') 115 return 116 117 # Add a new item to the state with info about jobs for this revision. 118 logging.info('Starting jobs for %s (%s):', timestamp[:10], revision) 119 item = {'revision': revision, 'timestamp': timestamp, 'jobs': []} 120 configs = LoadJsonFile(JOB_CONFIGS_PATH) 121 for config in configs: 122 config['base_git_hash'] = revision 123 with tempfile_ext.NamedTemporaryFile() as tmp: 124 json.dump(config, tmp) 125 tmp.close() 126 output = subprocess.check_output( 127 ['vpython', PINPOINT_CLI, 'start-job', tmp.name], 128 universal_newlines=True).strip() 129 logging.info(output) 130 assert 'https://pinpoint' in output 131 bot = config['configuration'] 132 item['jobs'].append({ 133 'id': output.split('/')[-1], 134 'status': 'queued', 135 'bot': bot 136 }) 137 state.append(item) 138 state.sort(key=lambda p: p['timestamp']) # Keep items sorted by date. 139 140 141def IsJobFinished(job): 142 return job['status'] in ['completed', 'failed'] 143 144 145def CollectPinpointResults(state): 146 """Check the status of pinpoint jobs and collect their results.""" 147 # First iterate over all running jobs, and update their status. 148 for item in state: 149 active = [job['id'] for job in item['jobs'] if not IsJobFinished(job)] 150 if not active: 151 continue 152 cmd = ['vpython', PINPOINT_CLI, 'status'] 153 cmd.extend(active) 154 output = subprocess.check_output(cmd, universal_newlines=True) 155 updates = dict(line.split(': ', 1) for line in output.splitlines()) 156 logging.info('Got job updates: %s.', updates) 157 for job in item['jobs']: 158 if job['id'] in updates: 159 job['status'] = updates[job['id']] 160 161 # Now iterate over all completed jobs, and download their results if needed. 162 for item in state: 163 if _SkipProcessing(item): # Skip if not ready or all failed. 164 continue 165 output_file = RevisionResultsFile(item) 166 if not os.path.exists(output_file): 167 cmd = ['vpython', PINPOINT_CLI, 'get-csv', '--output', output_file, '--'] 168 job_ids = [j['id'] for j in item['jobs'] if j['status'] == 'completed'] 169 logging.info('Getting csv data for commit: %s.', item['revision']) 170 subprocess.check_output(cmd + job_ids) 171 172 173def LoadJobsState(): 174 """Load the latest recorded state of pinpoint jobs.""" 175 local_path = CachedFilePath(JOBS_STATE_FILE) 176 if os.path.exists(local_path) or DownloadFromCloudStorage(local_path): 177 return LoadJsonFile(local_path) 178 else: 179 logging.info('No jobs state found. Creating empty state.') 180 return [] 181 182 183def UpdateJobsState(state): 184 """Write back the updated state of pinpoint jobs. 185 186 If there were any changes to the state, i.e. new jobs were created or 187 existing ones completed, both the local cached copy and the backup in cloud 188 storage are updated. 189 """ 190 local_path = CachedFilePath(JOBS_STATE_FILE) 191 with tempfile_ext.NamedTemporaryFile() as tmp: 192 json.dump(state, tmp, sort_keys=True, indent=2, separators=(',', ': ')) 193 tmp.close() 194 if not os.path.exists(local_path) or not filecmp.cmp(tmp.name, local_path): 195 shutil.copyfile(tmp.name, local_path) 196 UploadToCloudStorage(local_path) 197 198 199def GetCachedDataset(): 200 """Load the latest dataset with cached data.""" 201 local_path = CachedFilePath(DATASET_PKL_FILE) 202 if os.path.exists(local_path) or DownloadFromCloudStorage(local_path): 203 return pd.read_pickle(local_path) 204 else: 205 return None 206 207 208def UpdateCachedDataset(df): 209 """Write back the dataset with cached data.""" 210 local_path = CachedFilePath(DATASET_PKL_FILE) 211 df.to_pickle(local_path) 212 UploadToCloudStorage(local_path) 213 214 215def GetItemsToUpdate(state): 216 """Select jobs with new data to download and cached data for existing jobs. 217 218 This also filters out old revisions to keep only recent (6 months) data. 219 220 Returns: 221 new_items: A list of job items from which to get data. 222 cached_df: A DataFrame with existing cached data, may be None. 223 """ 224 from_date = str(TimeAgo(months=6).date()) 225 new_items = [item for item in state if item['timestamp'] > from_date] 226 df = GetCachedDataset() 227 if df is not None: 228 recent_revisions = set(item['revision'] for item in new_items) 229 df = df[df['revision'].isin(recent_revisions)] 230 known_revisions = set(df['revision']) 231 new_items = [ 232 item for item in new_items if item['revision'] not in known_revisions] 233 return new_items, df 234 235 236def AggregateAndUploadResults(new_items, cached_df=None): 237 """Aggregate results collected and upload them to cloud storage.""" 238 dfs = [] 239 if cached_df is not None: 240 dfs.append(cached_df) 241 242 found_new = False 243 for item in new_items: 244 if _SkipProcessing(item): # Jobs are not ready, or all have failed. 245 continue 246 if not found_new: 247 logging.info('Processing data from new results:') 248 found_new = True 249 logging.info('- %s (%s)', item['timestamp'][:10], item['revision']) 250 dfs.append(GetRevisionResults(item)) 251 252 if not found_new: 253 logging.info('No new data found.') 254 return 255 256 # Otherwise update our cache and upload. 257 df = pd.concat(dfs, ignore_index=True) 258 UpdateCachedDataset(df) 259 260 # Drop revisions with no results and mark the last result for each metric, 261 # both with/without patch, as a 'reference'. This allows making score cards 262 # comparing their most recent results in Data Studio dashboards. 263 df = df[df['count'] > 0].copy() 264 latest_result = df.groupby( 265 ['label', 'benchmark', 'name'])['timestamp'].transform('max') 266 df['reference'] = df['timestamp'] == latest_result 267 268 dataset_file = CachedFilePath(DATASET_CSV_FILE) 269 df.to_csv(dataset_file, index=False) 270 UploadToCloudStorage(dataset_file) 271 logging.info('Total %s rows of data uploaded.' % len(df.index)) 272 273 274def GetRevisionResults(item): 275 """Aggregate the results from jobs that ran on a particular revision.""" 276 # First load pinpoint csv results into a DataFrame. The dtype arg is needed 277 # to ensure that job_id's are always read a strings (even if some of them 278 # look like large numbers). 279 df = pd.read_csv(RevisionResultsFile(item), dtype={'job_id': str}) 280 assert df['change'].str.contains(item['revision']).all(), ( 281 'Not all results match the expected git revision') 282 283 # Filter out and keep only the measurements and stories that we want. 284 df = df[df['name'].isin(MEASUREMENTS)] 285 df = df[df['story'].isin(ACTIVE_STORIES)] 286 287 if not df.empty: 288 # Aggregate over the results of individual stories. 289 df = df.groupby(['change', 'job_id', 'name', 'benchmark', 290 'unit'])['mean'].agg(['mean', 'count']).reset_index() 291 else: 292 # Otherwise build a single row with an "empty" aggregate for this revision. 293 # This is needed so we can remember in the cache that this revision has 294 # been processed. 295 df = pd.DataFrame(index=[0]) 296 df['change'] = item['revision'] 297 df['job_id'] = '(missing)' 298 df['name'] = '(missing)' 299 df['benchmark'] = '(missing)' 300 df['unit'] = '' 301 df['mean'] = np.nan 302 df['count'] = 0 303 304 # Convert time units from milliseconds to seconds. This is what Data Studio 305 # dashboards expect. 306 is_ms_unit = df['unit'].str.startswith('ms_') 307 df.loc[is_ms_unit, 'mean'] = df['mean'] / 1000 308 309 # Distinguish jobs that ran with/without the tested patch. 310 df['label'] = df['change'].str.contains(r'\+').map( 311 {False: 'without_patch', True: 'with_patch'}) 312 313 # Add timestamp and revision information. We snap the date to noon and make 314 # it naive (i.e. no timezone), so the dashboard doesn't get confused with 315 # dates close to the end of day. 316 date = item['timestamp'].split('T')[0] + 'T12:00:00' 317 df['timestamp'] = pd.Timestamp(date) 318 df['revision'] = item['revision'] 319 320 # Fake the timestamp of jobs without the patch to appear as if they ran a 321 # year ago; this makes it easier to visualize and compare timeseries from 322 # runs with/without the patch in Data Studio dashboards. 323 df.loc[df['label'] == 'without_patch', 'timestamp'] = ( 324 df['timestamp'] - pd.DateOffset(years=1)) 325 326 df['bot'] = 'unknown' 327 for j in item['jobs']: 328 bot = j.get('bot', 'unknown') 329 df.loc[df['job_id'].str.contains(str(j['id'])), 'bot'] = bot 330 331 return df[[ 332 'revision', 'timestamp', 'bot', 'label', 'benchmark', 'name', 'mean', 333 'count' 334 ]] 335 336 337def _SkipProcessing(item): 338 """Return True if not all jobs have finished or all have failed.""" 339 return (not all(IsJobFinished(job) for job in item['jobs']) or 340 all(job['status'] == 'failed' for job in item['jobs'])) 341 342 343def GetLastCommitOfDate(date): 344 """"Find the the lastest commit that landed on a given date.""" 345 # Make sure our local git repo has up to date info on origin/master. 346 logging.info('Fetching latest origin/master data.') 347 subprocess.check_output( 348 ['git', 'fetch', 'origin', 'master'], cwd=TOOLS_PERF_DIR, 349 stderr=subprocess.STDOUT) 350 351 # Snap the date to the end of the day. 352 cutoff_date = date.replace(hour=12).ceil('D') 353 logging.info('Finding latest commit before %s.', cutoff_date) 354 if not FindCommit(after_date=cutoff_date): 355 # We expect there to be some commits after the 'cutoff_date', otherwise 356 # there isn't yet a *last* commit before that date. 357 raise ValueError("Given date appears to be in the future. There isn't yet " 358 'a last commit before %s.' % cutoff_date) 359 return FindCommit(before_date=cutoff_date) 360 361 362def FindCommit(before_date=None, after_date=None): 363 """Find latest commit with optional before/after date constraints.""" 364 cmd = ['git', 'log', '--max-count', '1', '--format=format:%H:%ct'] 365 if before_date is not None: 366 cmd.extend(['--before', before_date.isoformat()]) 367 if after_date is not None: 368 cmd.extend(['--after', after_date.isoformat()]) 369 cmd.append('origin/master') 370 line = subprocess.check_output(cmd, cwd=TOOLS_PERF_DIR).strip() 371 if line: 372 revision, commit_time = line.split(':') 373 commit_time = pd.Timestamp( 374 int(commit_time), unit='s', tz=TZ).isoformat() 375 return revision, commit_time 376 else: 377 return None 378 379 380def RevisionResultsFile(item): 381 """Get a filepath where to cache results of jobs for a single revision.""" 382 return CachedFilePath('job_results', item['revision'] + '.csv') 383 384 385def CachedFilePath(arg, *args): 386 """Get the path to a file stored in local cache.""" 387 return os.path.join(CACHED_DATA_DIR, arg, *args) 388 389 390def UploadToCloudStorage(filepath): 391 """Copy the given file to cloud storage.""" 392 gsutil.Copy( 393 filepath, posixpath.join(CLOUD_STORAGE_DIR, os.path.basename(filepath))) 394 395 396def DownloadFromCloudStorage(filepath): 397 """Get the given file from cloud storage.""" 398 try: 399 gsutil.Copy( 400 posixpath.join(CLOUD_STORAGE_DIR, os.path.basename(filepath)), filepath) 401 logging.info('Downloaded copy of %s from cloud storage.', filepath) 402 return True 403 except subprocess.CalledProcessError: 404 logging.info('Failed to download copy of %s from cloud storage.', filepath) 405 return False 406 407 408def LoadJsonFile(filename): 409 with open(filename) as f: 410 return json.load(f) 411 412 413def TimeAgo(**kwargs): 414 return pd.Timestamp.now(TZ) - pd.DateOffset(**kwargs) 415 416 417def SetUpLogging(level): 418 """Set up logging to log both to stderr and a file.""" 419 logger = logging.getLogger() 420 logger.setLevel(level) 421 formatter = logging.Formatter( 422 '(%(levelname)s) %(asctime)s [%(module)s] %(message)s') 423 424 h1 = logging.StreamHandler() 425 h1.setFormatter(formatter) 426 logger.addHandler(h1) 427 428 h2 = handlers.TimedRotatingFileHandler( 429 filename=CachedFilePath('pinboard.log'), when='W0', backupCount=5) 430 h2.setFormatter(formatter) 431 logger.addHandler(h2) 432 433 434def Main(): 435 SetUpLogging(level=logging.INFO) 436 actions = ('start', 'collect', 'upload') 437 parser = argparse.ArgumentParser() 438 parser.add_argument( 439 'actions', metavar='ACTION', nargs='+', choices=actions + ('auto',), 440 help=("select action to perform: 'start' pinpoint jobs, 'collect' job " 441 "results, 'upload' aggregated data, or 'auto' to do all in " 442 "sequence.")) 443 parser.add_argument( 444 '--date', type=lambda s: pd.Timestamp(s, tz=TZ), default=TimeAgo(days=1), 445 help=('Run jobs for the last commit landed on the given date (assuming ' 446 'MTV time). Defaults to the last commit landed yesterday.')) 447 args = parser.parse_args() 448 if 'auto' in args.actions: 449 logging.info('=== auto run for %s ===', args.date) 450 args.actions = actions 451 452 cached_results_dir = CachedFilePath('job_results') 453 if not os.path.isdir(cached_results_dir): 454 os.makedirs(cached_results_dir) 455 456 state = LoadJobsState() 457 try: 458 if 'start' in args.actions: 459 StartPinpointJobs(state, args.date) 460 new_items, cached_df = GetItemsToUpdate(state) 461 if 'collect' in args.actions: 462 CollectPinpointResults(new_items) 463 finally: 464 UpdateJobsState(state) 465 466 if 'upload' in args.actions: 467 AggregateAndUploadResults(new_items, cached_df) 468