1# Copyright 2019 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import print_function 6from __future__ import division 7from __future__ import absolute_import 8 9import collections 10import functools 11import itertools 12import logging 13 14from dashboard.common import timing 15 16from google.appengine.ext import ndb 17from google.appengine.ext import db 18 19__all__ = ( 20 'PopulateTaskGraph', 21 'TaskGraph', 22 'TaskVertex', 23 'Dependency', 24 'Evaluate', 25 'ExtendTaskGraph', 26 'UpdateTask', 27 'AppendTasklog', 28) 29 30TaskVertex = collections.namedtuple('TaskVertex', 31 ('id', 'vertex_type', 'payload')) 32Dependency = collections.namedtuple('Dependency', ('from_', 'to')) 33TaskGraph = collections.namedtuple('TaskGraph', ('vertices', 'edges')) 34 35# These InMemoryTask instances are meant to isolate the Task model which is 36# actually persisted in Datastore. 37InMemoryTask = collections.namedtuple( 38 'InMemoryTask', ('id', 'task_type', 'payload', 'status', 'dependencies')) 39 40VALID_TRANSITIONS = { 41 'pending': {'ongoing', 'completed', 'failed', 'cancelled'}, 42 'ongoing': {'completed', 'failed', 'cancelled'}, 43 'cancelled': {'pending'}, 44 'completed': {'pending'}, 45 'failed': {'pending'}, 46} 47 48# Traversal states used in the graph traversal. We use these as marks for when 49# vertices are traversed, as how we would implement graph colouring in a graph 50# traversal (like Depth First Search). 51NOT_EVALUATED, CHILDREN_PENDING, EVALUATION_DONE = (0, 1, 2) 52 53ReconstitutedTaskGraph = collections.namedtuple('ReconstitutedTaskGraph', 54 ('terminal_tasks', 'tasks')) 55 56 57class Error(Exception): 58 pass 59 60 61class InvalidAmendment(Error): 62 pass 63 64 65class TaskNotFound(Error): 66 pass 67 68 69class InvalidTransition(Error): 70 pass 71 72 73# These are internal-only models, used as an implementation detail of the 74# execution engine. 75class Task(ndb.Model): 76 """A Task associated with a Pinpoint Job. 77 78 Task instances are always associated with a Job. Tasks represent units of work 79 that are in well-defined states. Updates to Task instances are transactional 80 and need to be. 81 """ 82 task_type = ndb.StringProperty(required=True) 83 status = ndb.StringProperty(required=True, choices=VALID_TRANSITIONS.keys()) 84 payload = ndb.JsonProperty(compressed=True, indexed=False) 85 dependencies = ndb.KeyProperty(repeated=True, kind='Task') 86 created = ndb.DateTimeProperty(required=True, auto_now_add=True) 87 updated = ndb.DateTimeProperty(required=True, auto_now_add=True) 88 89 def ToInMemoryTask(self): 90 # We isolate the ndb model `Task` from the evaluator, to avoid accidentially 91 # modifying the state in datastore. 92 return InMemoryTask( 93 id=self.key.id(), 94 task_type=self.task_type, 95 payload=self.payload, 96 status=self.status, 97 dependencies=[dep.id() for dep in self.dependencies]) 98 99 100class TaskLog(ndb.Model): 101 """Log entries associated with Task instances. 102 103 TaskLog instances are always associated with a Task. These entries are 104 immutable once created. 105 """ 106 timestamp = ndb.DateTimeProperty( 107 required=True, auto_now_add=True, indexed=False) 108 message = ndb.TextProperty() 109 payload = ndb.JsonProperty(compressed=True, indexed=False) 110 111 112@ndb.transactional(propagation=ndb.TransactionOptions.INDEPENDENT, retries=0) 113def PopulateTaskGraph(job, graph): 114 """Populate the Datastore with Task instances associated with a Job. 115 116 The `graph` argument must have two properties: a collection of `TaskVertex` 117 instances named `vertices` and a collection of `Dependency` instances named 118 `dependencies`. 119 """ 120 if job is None: 121 raise ValueError('job must not be None.') 122 123 job_key = job.key 124 tasks = { 125 v.id: Task( 126 key=ndb.Key(Task, v.id, parent=job_key), 127 task_type=v.vertex_type, 128 payload=v.payload, 129 status='pending') for v in graph.vertices 130 } 131 dependencies = set() 132 for dependency in graph.edges: 133 dependency_key = ndb.Key(Task, dependency.to, parent=job_key) 134 if dependency not in dependencies: 135 tasks[dependency.from_].dependencies.append(dependency_key) 136 dependencies.add(dependency) 137 138 ndb.put_multi(tasks.values(), use_cache=True) 139 140 141@ndb.transactional(propagation=ndb.TransactionOptions.INDEPENDENT, retries=0) 142def ExtendTaskGraph(job, vertices, dependencies): 143 """Add new vertices and dependency links to the graph. 144 145 Args: 146 job: a dashboard.pinpoint.model.job.Job instance. 147 vertices: an iterable of TaskVertex instances. 148 dependencies: an iterable of Dependency instances. 149 """ 150 if job is None: 151 raise ValueError('job must not be None.') 152 if not vertices and not dependencies: 153 return 154 155 job_key = job.key 156 amendment_task_graph = { 157 v.id: Task( 158 key=ndb.Key(Task, v.id, parent=job_key), 159 task_type=v.vertex_type, 160 status='pending', 161 payload=v.payload) for v in vertices 162 } 163 164 # Ensure that the keys we're adding are not in the graph yet. 165 current_tasks = Task.query(ancestor=job_key).fetch() 166 current_task_keys = set(t.key for t in current_tasks) 167 new_task_keys = set(t.key for t in amendment_task_graph.values()) 168 overlap = new_task_keys & current_task_keys 169 if overlap: 170 raise InvalidAmendment('vertices (%r) already in task graph.' % (overlap,)) 171 172 # Then we add the dependencies. 173 current_task_graph = {t.key.id(): t for t in current_tasks} 174 handled_dependencies = set() 175 update_filter = set(amendment_task_graph) 176 for dependency in dependencies: 177 dependency_key = ndb.Key(Task, dependency.to, parent=job_key) 178 if dependency not in handled_dependencies: 179 current_task = current_task_graph.get(dependency.from_) 180 amendment_task = amendment_task_graph.get(dependency.from_) 181 if current_task is None and amendment_task is None: 182 raise InvalidAmendment('dependency `from` (%s) not in amended graph.' % 183 (dependency.from_,)) 184 if current_task: 185 current_task_graph[dependency.from_].dependencies.append(dependency_key) 186 if amendment_task: 187 amendment_task_graph[dependency.from_].dependencies.append( 188 dependency_key) 189 handled_dependencies.add(dependency) 190 update_filter.add(dependency.from_) 191 192 ndb.put_multi( 193 itertools.chain( 194 amendment_task_graph.values(), 195 [t for id_, t in current_task_graph.items() if id_ in update_filter]), 196 use_cache=True) 197 198 199@ndb.transactional(propagation=ndb.TransactionOptions.INDEPENDENT, retries=0) 200def UpdateTask(job, task_id, new_state=None, payload=None): 201 """Update a task. 202 203 This enforces that the status transitions are semantically correct, where only 204 the transitions defined in the VALID_TRANSITIONS map are allowed. 205 206 When either new_state or payload are not None, this function performs the 207 update transactionally. At least one of `new_state` or `payload` must be 208 provided in calls to this function. 209 """ 210 if new_state is None and payload is None: 211 raise ValueError('Set one of `new_state` or `payload`.') 212 213 if new_state and new_state not in VALID_TRANSITIONS: 214 raise InvalidTransition('Unknown state: %s' % (new_state,)) 215 216 task = Task.get_by_id(task_id, parent=job.key) 217 if not task: 218 raise TaskNotFound('Task with id "%s" not found for job "%s".' % 219 (task_id, job.job_id)) 220 221 if new_state: 222 valid_transitions = VALID_TRANSITIONS.get(task.status) 223 if new_state not in valid_transitions: 224 raise InvalidTransition( 225 'Attempting transition from "%s" to "%s" not in %s; task = %s' % 226 (task.status, new_state, valid_transitions, task)) 227 task.status = new_state 228 229 if payload: 230 task.payload = payload 231 232 task.put() 233 234 235def LogStateTransitionFailures(wrapped_action): 236 """Decorator to log state transition failures. 237 238 This is a convenience decorator to handle state transition failures, and 239 suppress further exception propagation of the transition failure. 240 """ 241 242 @functools.wraps(wrapped_action) 243 def ActionWrapper(*args, **kwargs): 244 try: 245 return wrapped_action(*args, **kwargs) 246 except InvalidTransition as e: 247 logging.error('State transition failed: %s', e) 248 return None 249 except db.TransactionFailedError as e: 250 logging.error('Transaction failed: %s', e) 251 return None 252 253 return ActionWrapper 254 255 256@ndb.transactional(propagation=ndb.TransactionOptions.INDEPENDENT, retries=0) 257def AppendTasklog(job, task_id, message, payload): 258 task_log = TaskLog( 259 parent=ndb.Key(Task, task_id, parent=job.key), 260 message=message, 261 payload=payload) 262 task_log.put() 263 264 265@ndb.transactional(propagation=ndb.TransactionOptions.INDEPENDENT, retries=0) 266def _LoadTaskGraph(job): 267 with timing.WallTimeLogger('ExecutionEngine:_LoadTaskGraph'): 268 tasks = Task.query(ancestor=job.key).fetch() 269 # The way we get the terminal tasks is by looking at tasks where nothing 270 # depends on them. 271 has_dependents = set() 272 for task in tasks: 273 has_dependents |= set(task.dependencies) 274 terminal_tasks = [t.key for t in tasks if t.key not in has_dependents] 275 return ReconstitutedTaskGraph( 276 terminal_tasks=terminal_tasks, tasks={task.key: task for task in tasks}) 277 278 279class NoopAction(object): 280 281 @staticmethod 282 def __str__(): 283 return 'NoopAction()' 284 285 @staticmethod 286 def __call__(_): 287 pass 288 289 290@ndb.non_transactional 291@timing.TimeWall('ExecutionEngine:Evaluate') 292def Evaluate(job, event, evaluator): 293 """Applies an evaluator given a task in the task graph and an event as input. 294 295 This function implements a depth-first search traversal of the task graph and 296 applies the `evaluator` given a task and the event input in post-order 297 traversal. We start the DFS from the terminal tasks (those that don't have 298 dependencies) and call the `evaluator` function with a representation of the 299 task in the graph, an `event` as input, and an accumulator argument. 300 301 The `evaluator` must be a callable which accepts three arguments: 302 303 - task: an InMemoryTask instance, representing a task in the graph. 304 - event: an object whose shape/type is defined by the caller of the 305 `Evaluate` function and that the evaluator can handle. 306 - accumulator: a dictionary which is mutable which is valid in the scope of 307 a traversal of the graph. 308 309 The `evaluator` must return either None or an iterable of callables which take 310 a single argument, which is the accumulator at the end of a traversal. 311 312 Events are free-form but usually are dictionaries which constitute inputs that 313 are external to the task graph evaluation. This could model events in an 314 event-driven evaluation of tasks, or synthetic inputs to the system. It is 315 more important that the `event` information is known to the evaluator 316 implementation, and is provided as-is to the evaluator in this function. 317 318 The Evaluate function will keep iterating while there are actions still being 319 produced by the evaluator. When there are no more actions to run, the Evaluate 320 function will return the most recent traversal's accumulator. 321 """ 322 if job is None: 323 raise ValueError('job must not be None.') 324 325 accumulator = {} 326 actions = [NoopAction()] 327 while actions: 328 for action in actions: 329 logging.debug('Running action: %s', action) 330 # Each action should be a callable which takes the accumulator as an 331 # input. We want to run each action in their own transaction as well. 332 # This must not be called in a transaction. 333 with timing.WallTimeLogger('ExecutionEngine:ActionRunner<%s>' % 334 (type(action).__name__,)): 335 action(accumulator) 336 337 # Clear the actions and accumulator for this traversal. 338 del actions[:] 339 accumulator.clear() 340 341 # Load the graph transactionally. 342 graph = _LoadTaskGraph(job) 343 344 if not graph.tasks: 345 logging.debug('Task graph empty for job %s', job.job_id) 346 return 347 348 # First get all the "terminal" tasks, and traverse the dependencies in a 349 # depth-first-search. 350 task_stack = [graph.tasks[task] for task in graph.terminal_tasks] 351 352 # If the stack is empty, we should start at an arbitrary point. 353 if not task_stack: 354 task_stack = [graph.tasks.values()[0]] 355 vertex_states = {} 356 while task_stack: 357 task = task_stack[-1] 358 state = vertex_states.get(task.key, NOT_EVALUATED) 359 if state == CHILDREN_PENDING: 360 in_memory_task = task.ToInMemoryTask() 361 result_actions = evaluator(in_memory_task, event, accumulator) 362 if result_actions: 363 actions.extend(result_actions) 364 vertex_states[task.key] = EVALUATION_DONE 365 elif state == NOT_EVALUATED: 366 # This vertex is coloured white, we should traverse the dependencies. 367 vertex_states[task.key] = CHILDREN_PENDING 368 for dependency in task.dependencies: 369 if vertex_states.get(dependency, NOT_EVALUATED) == NOT_EVALUATED: 370 task_stack.append(graph.tasks[dependency]) 371 else: 372 assert state == EVALUATION_DONE 373 task_stack.pop() 374 375 return accumulator 376