1# -*- coding: utf-8 -*- # 2# Copyright 2020 Google LLC. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Utilities for interacting with streaming logs.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import unicode_literals 20 21import copy 22 23from apitools.base.py import encoding 24from googlecloudsdk.command_lib.logs import stream 25import six 26 27LOG_FORMAT = ('value(' 28 'severity,' 29 'timestamp.date("%Y-%m-%d %H:%M:%S %z",tz="LOCAL"), ' 30 'task_name,' 31 'message' 32 ')') 33_CONTINUE_INTERVAL = 10 34 35 36def StreamLogs(name, continue_function, polling_interval, task_name, 37 allow_multiline): 38 """Returns the streaming log of the job by id. 39 40 Args: 41 name: string id of the entity. 42 continue_function: One-arg function that takes in the number of empty polls 43 and outputs a boolean to decide if we should keep polling or not. If not 44 given, keep polling indefinitely. 45 polling_interval: amount of time to sleep between each poll. 46 task_name: String name of task. 47 allow_multiline: Tells us if logs with multiline messages are okay or not. 48 """ 49 log_fetcher = stream.LogFetcher( 50 filters=_LogFilters(name, task_name=task_name), 51 polling_interval=polling_interval, 52 continue_interval=_CONTINUE_INTERVAL, 53 continue_func=continue_function) 54 return _SplitMultiline(log_fetcher.YieldLogs(), allow_multiline) 55 56 57def _LogFilters(name, task_name): 58 """Returns filters for log fetcher to use. 59 60 Args: 61 name: string id of the entity. 62 task_name: String name of task. 63 64 Returns: 65 A list of filters to be passed to the logging API. 66 """ 67 filters = [ 68 'resource.type="ml_job"', 'resource.labels.job_id="{0}"'.format(name) 69 ] 70 if task_name: 71 filters.append('resource.labels.task_name="{0}"'.format(task_name)) 72 return filters 73 74 75def _SplitMultiline(log_generator, allow_multiline=False): 76 """Splits the dict output of logs into multiple lines. 77 78 Args: 79 log_generator: iterator that returns a an ml log in dict format. 80 allow_multiline: Tells us if logs with multiline messages are okay or not. 81 82 Yields: 83 Single-line ml log dictionaries. 84 """ 85 for log in log_generator: 86 log_dict = _EntryToDict(log) 87 messages = log_dict['message'].splitlines() 88 if allow_multiline: 89 yield log_dict 90 else: 91 if not messages: 92 messages = [''] 93 for message in messages: 94 single_line_log = copy.deepcopy(log_dict) 95 single_line_log['message'] = message 96 yield single_line_log 97 98 99def _EntryToDict(log_entry): 100 """Converts a log entry to a dictionary.""" 101 output = {} 102 output['severity'] = log_entry.severity.name 103 output['timestamp'] = log_entry.timestamp 104 output['task_name'] = _GetTaskName(log_entry) 105 message = [] 106 if log_entry.jsonPayload is not None: 107 json_data = _ToDict(log_entry.jsonPayload) 108 # 'message' contains a free-text message that we want to pull out of the 109 # JSON. 110 if 'message' in json_data: 111 if json_data['message']: 112 message.append(json_data['message']) 113 elif log_entry.textPayload is not None: 114 message.append(six.text_type(log_entry.textPayload)) 115 output['message'] = ''.join(message) 116 return output 117 118 119def _GetTaskName(log_entry): 120 """Reads the label attributes of the given log entry.""" 121 resource_labels = {} if not log_entry.resource else _ToDict( 122 log_entry.resource.labels) 123 return 'unknown_task' if not resource_labels.get( 124 'task_name') else resource_labels['task_name'] 125 126 127def _ToDict(message): 128 if not message: 129 return {} 130 if isinstance(message, dict): 131 return message 132 else: 133 return encoding.MessageToDict(message) 134