1# -*- coding: utf-8 -*- #
2# Copyright 2020 Google LLC. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#    http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Utilities for interacting with streaming logs."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import unicode_literals
20
21import copy
22
23from apitools.base.py import encoding
24from googlecloudsdk.command_lib.logs import stream
25import six
26
27LOG_FORMAT = ('value('
28              'severity,'
29              'timestamp.date("%Y-%m-%d %H:%M:%S %z",tz="LOCAL"), '
30              'task_name,'
31              'message'
32              ')')
33_CONTINUE_INTERVAL = 10
34
35
36def StreamLogs(name, continue_function, polling_interval, task_name,
37               allow_multiline):
38  """Returns the streaming log of the job by id.
39
40  Args:
41    name: string id of the entity.
42    continue_function: One-arg function that takes in the number of empty polls
43      and outputs a boolean to decide if we should keep polling or not. If not
44      given, keep polling indefinitely.
45    polling_interval: amount of time to sleep between each poll.
46    task_name: String name of task.
47    allow_multiline: Tells us if logs with multiline messages are okay or not.
48  """
49  log_fetcher = stream.LogFetcher(
50      filters=_LogFilters(name, task_name=task_name),
51      polling_interval=polling_interval,
52      continue_interval=_CONTINUE_INTERVAL,
53      continue_func=continue_function)
54  return _SplitMultiline(log_fetcher.YieldLogs(), allow_multiline)
55
56
57def _LogFilters(name, task_name):
58  """Returns filters for log fetcher to use.
59
60  Args:
61    name: string id of the entity.
62    task_name: String name of task.
63
64  Returns:
65    A list of filters to be passed to the logging API.
66  """
67  filters = [
68      'resource.type="ml_job"', 'resource.labels.job_id="{0}"'.format(name)
69  ]
70  if task_name:
71    filters.append('resource.labels.task_name="{0}"'.format(task_name))
72  return filters
73
74
75def _SplitMultiline(log_generator, allow_multiline=False):
76  """Splits the dict output of logs into multiple lines.
77
78  Args:
79    log_generator: iterator that returns a an ml log in dict format.
80    allow_multiline: Tells us if logs with multiline messages are okay or not.
81
82  Yields:
83    Single-line ml log dictionaries.
84  """
85  for log in log_generator:
86    log_dict = _EntryToDict(log)
87    messages = log_dict['message'].splitlines()
88    if allow_multiline:
89      yield log_dict
90    else:
91      if not messages:
92        messages = ['']
93      for message in messages:
94        single_line_log = copy.deepcopy(log_dict)
95        single_line_log['message'] = message
96        yield single_line_log
97
98
99def _EntryToDict(log_entry):
100  """Converts a log entry to a dictionary."""
101  output = {}
102  output['severity'] = log_entry.severity.name
103  output['timestamp'] = log_entry.timestamp
104  output['task_name'] = _GetTaskName(log_entry)
105  message = []
106  if log_entry.jsonPayload is not None:
107    json_data = _ToDict(log_entry.jsonPayload)
108    # 'message' contains a free-text message that we want to pull out of the
109    # JSON.
110    if 'message' in json_data:
111      if json_data['message']:
112        message.append(json_data['message'])
113  elif log_entry.textPayload is not None:
114    message.append(six.text_type(log_entry.textPayload))
115  output['message'] = ''.join(message)
116  return output
117
118
119def _GetTaskName(log_entry):
120  """Reads the label attributes of the given log entry."""
121  resource_labels = {} if not log_entry.resource else _ToDict(
122      log_entry.resource.labels)
123  return 'unknown_task' if not resource_labels.get(
124      'task_name') else resource_labels['task_name']
125
126
127def _ToDict(message):
128  if not message:
129    return {}
130  if isinstance(message, dict):
131    return message
132  else:
133    return encoding.MessageToDict(message)
134