1# Copyright 2019 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import print_function
6from __future__ import division
7from __future__ import absolute_import
8
9import collections
10import functools
11import itertools
12import logging
13
14from dashboard.common import timing
15
16from google.appengine.ext import ndb
17from google.appengine.ext import db
18
19__all__ = (
20    'PopulateTaskGraph',
21    'TaskGraph',
22    'TaskVertex',
23    'Dependency',
24    'Evaluate',
25    'ExtendTaskGraph',
26    'UpdateTask',
27    'AppendTasklog',
28)
29
30TaskVertex = collections.namedtuple('TaskVertex',
31                                    ('id', 'vertex_type', 'payload'))
32Dependency = collections.namedtuple('Dependency', ('from_', 'to'))
33TaskGraph = collections.namedtuple('TaskGraph', ('vertices', 'edges'))
34
35# These InMemoryTask instances are meant to isolate the Task model which is
36# actually persisted in Datastore.
37InMemoryTask = collections.namedtuple(
38    'InMemoryTask', ('id', 'task_type', 'payload', 'status', 'dependencies'))
39
40VALID_TRANSITIONS = {
41    'pending': {'ongoing', 'completed', 'failed', 'cancelled'},
42    'ongoing': {'completed', 'failed', 'cancelled'},
43    'cancelled': {'pending'},
44    'completed': {'pending'},
45    'failed': {'pending'},
46}
47
48# Traversal states used in the graph traversal. We use these as marks for when
49# vertices are traversed, as how we would implement graph colouring in a graph
50# traversal (like Depth First Search).
51NOT_EVALUATED, CHILDREN_PENDING, EVALUATION_DONE = (0, 1, 2)
52
53ReconstitutedTaskGraph = collections.namedtuple('ReconstitutedTaskGraph',
54                                                ('terminal_tasks', 'tasks'))
55
56
57class Error(Exception):
58  pass
59
60
61class InvalidAmendment(Error):
62  pass
63
64
65class TaskNotFound(Error):
66  pass
67
68
69class InvalidTransition(Error):
70  pass
71
72
73# These are internal-only models, used as an implementation detail of the
74# execution engine.
75class Task(ndb.Model):
76  """A Task associated with a Pinpoint Job.
77
78  Task instances are always associated with a Job. Tasks represent units of work
79  that are in well-defined states. Updates to Task instances are transactional
80  and need to be.
81  """
82  task_type = ndb.StringProperty(required=True)
83  status = ndb.StringProperty(required=True, choices=VALID_TRANSITIONS.keys())
84  payload = ndb.JsonProperty(compressed=True, indexed=False)
85  dependencies = ndb.KeyProperty(repeated=True, kind='Task')
86  created = ndb.DateTimeProperty(required=True, auto_now_add=True)
87  updated = ndb.DateTimeProperty(required=True, auto_now_add=True)
88
89  def ToInMemoryTask(self):
90    # We isolate the ndb model `Task` from the evaluator, to avoid accidentially
91    # modifying the state in datastore.
92    return InMemoryTask(
93        id=self.key.id(),
94        task_type=self.task_type,
95        payload=self.payload,
96        status=self.status,
97        dependencies=[dep.id() for dep in self.dependencies])
98
99
100class TaskLog(ndb.Model):
101  """Log entries associated with Task instances.
102
103  TaskLog instances are always associated with a Task. These entries are
104  immutable once created.
105  """
106  timestamp = ndb.DateTimeProperty(
107      required=True, auto_now_add=True, indexed=False)
108  message = ndb.TextProperty()
109  payload = ndb.JsonProperty(compressed=True, indexed=False)
110
111
112@ndb.transactional(propagation=ndb.TransactionOptions.INDEPENDENT, retries=0)
113def PopulateTaskGraph(job, graph):
114  """Populate the Datastore with Task instances associated with a Job.
115
116  The `graph` argument must have two properties: a collection of `TaskVertex`
117  instances named `vertices` and a collection of `Dependency` instances named
118  `dependencies`.
119  """
120  if job is None:
121    raise ValueError('job must not be None.')
122
123  job_key = job.key
124  tasks = {
125      v.id: Task(
126          key=ndb.Key(Task, v.id, parent=job_key),
127          task_type=v.vertex_type,
128          payload=v.payload,
129          status='pending') for v in graph.vertices
130  }
131  dependencies = set()
132  for dependency in graph.edges:
133    dependency_key = ndb.Key(Task, dependency.to, parent=job_key)
134    if dependency not in dependencies:
135      tasks[dependency.from_].dependencies.append(dependency_key)
136      dependencies.add(dependency)
137
138  ndb.put_multi(tasks.values(), use_cache=True)
139
140
141@ndb.transactional(propagation=ndb.TransactionOptions.INDEPENDENT, retries=0)
142def ExtendTaskGraph(job, vertices, dependencies):
143  """Add new vertices and dependency links to the graph.
144
145  Args:
146    job: a dashboard.pinpoint.model.job.Job instance.
147    vertices: an iterable of TaskVertex instances.
148    dependencies: an iterable of Dependency instances.
149  """
150  if job is None:
151    raise ValueError('job must not be None.')
152  if not vertices and not dependencies:
153    return
154
155  job_key = job.key
156  amendment_task_graph = {
157      v.id: Task(
158          key=ndb.Key(Task, v.id, parent=job_key),
159          task_type=v.vertex_type,
160          status='pending',
161          payload=v.payload) for v in vertices
162  }
163
164  # Ensure that the keys we're adding are not in the graph yet.
165  current_tasks = Task.query(ancestor=job_key).fetch()
166  current_task_keys = set(t.key for t in current_tasks)
167  new_task_keys = set(t.key for t in amendment_task_graph.values())
168  overlap = new_task_keys & current_task_keys
169  if overlap:
170    raise InvalidAmendment('vertices (%r) already in task graph.' % (overlap,))
171
172  # Then we add the dependencies.
173  current_task_graph = {t.key.id(): t for t in current_tasks}
174  handled_dependencies = set()
175  update_filter = set(amendment_task_graph)
176  for dependency in dependencies:
177    dependency_key = ndb.Key(Task, dependency.to, parent=job_key)
178    if dependency not in handled_dependencies:
179      current_task = current_task_graph.get(dependency.from_)
180      amendment_task = amendment_task_graph.get(dependency.from_)
181      if current_task is None and amendment_task is None:
182        raise InvalidAmendment('dependency `from` (%s) not in amended graph.' %
183                               (dependency.from_,))
184      if current_task:
185        current_task_graph[dependency.from_].dependencies.append(dependency_key)
186      if amendment_task:
187        amendment_task_graph[dependency.from_].dependencies.append(
188            dependency_key)
189      handled_dependencies.add(dependency)
190      update_filter.add(dependency.from_)
191
192  ndb.put_multi(
193      itertools.chain(
194          amendment_task_graph.values(),
195          [t for id_, t in current_task_graph.items() if id_ in update_filter]),
196      use_cache=True)
197
198
199@ndb.transactional(propagation=ndb.TransactionOptions.INDEPENDENT, retries=0)
200def UpdateTask(job, task_id, new_state=None, payload=None):
201  """Update a task.
202
203  This enforces that the status transitions are semantically correct, where only
204  the transitions defined in the VALID_TRANSITIONS map are allowed.
205
206  When either new_state or payload are not None, this function performs the
207  update transactionally. At least one of `new_state` or `payload` must be
208  provided in calls to this function.
209  """
210  if new_state is None and payload is None:
211    raise ValueError('Set one of `new_state` or `payload`.')
212
213  if new_state and new_state not in VALID_TRANSITIONS:
214    raise InvalidTransition('Unknown state: %s' % (new_state,))
215
216  task = Task.get_by_id(task_id, parent=job.key)
217  if not task:
218    raise TaskNotFound('Task with id "%s" not found for job "%s".' %
219                       (task_id, job.job_id))
220
221  if new_state:
222    valid_transitions = VALID_TRANSITIONS.get(task.status)
223    if new_state not in valid_transitions:
224      raise InvalidTransition(
225          'Attempting transition from "%s" to "%s" not in %s; task = %s' %
226          (task.status, new_state, valid_transitions, task))
227    task.status = new_state
228
229  if payload:
230    task.payload = payload
231
232  task.put()
233
234
235def LogStateTransitionFailures(wrapped_action):
236  """Decorator to log state transition failures.
237
238  This is a convenience decorator to handle state transition failures, and
239  suppress further exception propagation of the transition failure.
240  """
241
242  @functools.wraps(wrapped_action)
243  def ActionWrapper(*args, **kwargs):
244    try:
245      return wrapped_action(*args, **kwargs)
246    except InvalidTransition as e:
247      logging.error('State transition failed: %s', e)
248      return None
249    except db.TransactionFailedError as e:
250      logging.error('Transaction failed: %s', e)
251      return None
252
253  return ActionWrapper
254
255
256@ndb.transactional(propagation=ndb.TransactionOptions.INDEPENDENT, retries=0)
257def AppendTasklog(job, task_id, message, payload):
258  task_log = TaskLog(
259      parent=ndb.Key(Task, task_id, parent=job.key),
260      message=message,
261      payload=payload)
262  task_log.put()
263
264
265@ndb.transactional(propagation=ndb.TransactionOptions.INDEPENDENT, retries=0)
266def _LoadTaskGraph(job):
267  with timing.WallTimeLogger('ExecutionEngine:_LoadTaskGraph'):
268    tasks = Task.query(ancestor=job.key).fetch()
269    # The way we get the terminal tasks is by looking at tasks where nothing
270    # depends on them.
271    has_dependents = set()
272    for task in tasks:
273      has_dependents |= set(task.dependencies)
274    terminal_tasks = [t.key for t in tasks if t.key not in has_dependents]
275    return ReconstitutedTaskGraph(
276        terminal_tasks=terminal_tasks, tasks={task.key: task for task in tasks})
277
278
279class NoopAction(object):
280
281  @staticmethod
282  def __str__():
283    return 'NoopAction()'
284
285  @staticmethod
286  def __call__(_):
287    pass
288
289
290@ndb.non_transactional
291@timing.TimeWall('ExecutionEngine:Evaluate')
292def Evaluate(job, event, evaluator):
293  """Applies an evaluator given a task in the task graph and an event as input.
294
295  This function implements a depth-first search traversal of the task graph and
296  applies the `evaluator` given a task and the event input in post-order
297  traversal. We start the DFS from the terminal tasks (those that don't have
298  dependencies) and call the `evaluator` function with a representation of the
299  task in the graph, an `event` as input, and an accumulator argument.
300
301  The `evaluator` must be a callable which accepts three arguments:
302
303    - task: an InMemoryTask instance, representing a task in the graph.
304    - event: an object whose shape/type is defined by the caller of the
305      `Evaluate` function and that the evaluator can handle.
306    - accumulator: a dictionary which is mutable which is valid in the scope of
307      a traversal of the graph.
308
309  The `evaluator` must return either None or an iterable of callables which take
310  a single argument, which is the accumulator at the end of a traversal.
311
312  Events are free-form but usually are dictionaries which constitute inputs that
313  are external to the task graph evaluation. This could model events in an
314  event-driven evaluation of tasks, or synthetic inputs to the system. It is
315  more important that the `event` information is known to the evaluator
316  implementation, and is provided as-is to the evaluator in this function.
317
318  The Evaluate function will keep iterating while there are actions still being
319  produced by the evaluator. When there are no more actions to run, the Evaluate
320  function will return the most recent traversal's accumulator.
321  """
322  if job is None:
323    raise ValueError('job must not be None.')
324
325  accumulator = {}
326  actions = [NoopAction()]
327  while actions:
328    for action in actions:
329      logging.debug('Running action: %s', action)
330      # Each action should be a callable which takes the accumulator as an
331      # input. We want to run each action in their own transaction as well.
332      # This must not be called in a transaction.
333      with timing.WallTimeLogger('ExecutionEngine:ActionRunner<%s>' %
334                                 (type(action).__name__,)):
335        action(accumulator)
336
337    # Clear the actions and accumulator for this traversal.
338    del actions[:]
339    accumulator.clear()
340
341    # Load the graph transactionally.
342    graph = _LoadTaskGraph(job)
343
344    if not graph.tasks:
345      logging.debug('Task graph empty for job %s', job.job_id)
346      return
347
348    # First get all the "terminal" tasks, and traverse the dependencies in a
349    # depth-first-search.
350    task_stack = [graph.tasks[task] for task in graph.terminal_tasks]
351
352    # If the stack is empty, we should start at an arbitrary point.
353    if not task_stack:
354      task_stack = [graph.tasks.values()[0]]
355    vertex_states = {}
356    while task_stack:
357      task = task_stack[-1]
358      state = vertex_states.get(task.key, NOT_EVALUATED)
359      if state == CHILDREN_PENDING:
360        in_memory_task = task.ToInMemoryTask()
361        result_actions = evaluator(in_memory_task, event, accumulator)
362        if result_actions:
363          actions.extend(result_actions)
364        vertex_states[task.key] = EVALUATION_DONE
365      elif state == NOT_EVALUATED:
366        # This vertex is coloured white, we should traverse the dependencies.
367        vertex_states[task.key] = CHILDREN_PENDING
368        for dependency in task.dependencies:
369          if vertex_states.get(dependency, NOT_EVALUATED) == NOT_EVALUATED:
370            task_stack.append(graph.tasks[dependency])
371      else:
372        assert state == EVALUATION_DONE
373        task_stack.pop()
374
375  return accumulator
376