1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Transport for Python logging handler
16
17Uses a background worker to log to Cloud Logging asynchronously.
18"""
19
20from __future__ import print_function
21
22import atexit
23import datetime
24import logging
25import queue
26import sys
27import threading
28import time
29
30from google.cloud.logging_v2 import _helpers
31from google.cloud.logging_v2.handlers.transports.base import Transport
32
33_DEFAULT_GRACE_PERIOD = 5.0  # Seconds
34_DEFAULT_MAX_BATCH_SIZE = 10
35_DEFAULT_MAX_LATENCY = 0  # Seconds
36_WORKER_THREAD_NAME = "google.cloud.logging.Worker"
37_WORKER_TERMINATOR = object()
38_LOGGER = logging.getLogger(__name__)
39
40
41def _get_many(queue_, *, max_items=None, max_latency=0):
42    """Get multiple items from a Queue.
43
44    Gets at least one (blocking) and at most ``max_items`` items
45    (non-blocking) from a given Queue. Does not mark the items as done.
46
47    Args:
48        queue_ (queue.Queue): The Queue to get items from.
49        max_items (Optional[int]): The maximum number of items to get.
50            If ``None``, then all available items in the queue are returned.
51        max_latency (Optional[float]): The maximum number of seconds to wait
52            for more than one item from a queue. This number includes
53            the time required to retrieve the first item.
54
55    Returns:
56        list: items retrieved from the queue
57    """
58    start = time.time()
59    # Always return at least one item.
60    items = [queue_.get()]
61    while max_items is None or len(items) < max_items:
62        try:
63            elapsed = time.time() - start
64            timeout = max(0, max_latency - elapsed)
65            items.append(queue_.get(timeout=timeout))
66        except queue.Empty:
67            break
68    return items
69
70
71class _Worker(object):
72    """A background thread that writes batches of log entries."""
73
74    def __init__(
75        self,
76        cloud_logger,
77        *,
78        grace_period=_DEFAULT_GRACE_PERIOD,
79        max_batch_size=_DEFAULT_MAX_BATCH_SIZE,
80        max_latency=_DEFAULT_MAX_LATENCY,
81    ):
82        """
83        Args:
84            cloud_logger (logging_v2.logger.Logger):
85                The logger to send entries to.
86            grace_period (Optional[float]): The amount of time to wait for pending logs to
87                be submitted when the process is shutting down.
88            max_batch (Optional[int]): The maximum number of items to send at a time
89                in the background thread.
90            max_latency (Optional[float]): The amount of time to wait for new logs before
91                sending a new batch. It is strongly recommended to keep this smaller
92                than the grace_period. This means this is effectively the longest
93                amount of time the background thread will hold onto log entries
94                before sending them to the server.
95        """
96        self._cloud_logger = cloud_logger
97        self._grace_period = grace_period
98        self._max_batch_size = max_batch_size
99        self._max_latency = max_latency
100        self._queue = queue.Queue(0)
101        self._operational_lock = threading.Lock()
102        self._thread = None
103
104    @property
105    def is_alive(self):
106        """Returns True is the background thread is running."""
107        return self._thread is not None and self._thread.is_alive()
108
109    def _safely_commit_batch(self, batch):
110        total_logs = len(batch.entries)
111
112        try:
113            if total_logs > 0:
114                batch.commit()
115                _LOGGER.debug("Submitted %d logs", total_logs)
116        except Exception:
117            _LOGGER.error("Failed to submit %d logs.", total_logs, exc_info=True)
118
119    def _thread_main(self):
120        """The entry point for the worker thread.
121
122        Pulls pending log entries off the queue and writes them in batches to
123        the Cloud Logger.
124        """
125        _LOGGER.debug("Background thread started.")
126
127        done = False
128        while not done:
129            batch = self._cloud_logger.batch()
130            items = _get_many(
131                self._queue,
132                max_items=self._max_batch_size,
133                max_latency=self._max_latency,
134            )
135
136            for item in items:
137                if item is _WORKER_TERMINATOR:
138                    done = True  # Continue processing items.
139                else:
140                    batch.log_struct(**item)
141
142            self._safely_commit_batch(batch)
143
144            for _ in items:
145                self._queue.task_done()
146
147        _LOGGER.debug("Background thread exited gracefully.")
148
149    def start(self):
150        """Starts the background thread.
151
152        Additionally, this registers a handler for process exit to attempt
153        to send any pending log entries before shutdown.
154        """
155        with self._operational_lock:
156            if self.is_alive:
157                return
158
159            self._thread = threading.Thread(
160                target=self._thread_main, name=_WORKER_THREAD_NAME
161            )
162            self._thread.daemon = True
163            self._thread.start()
164            atexit.register(self._main_thread_terminated)
165
166    def stop(self, *, grace_period=None):
167        """Signals the background thread to stop.
168
169        This does not terminate the background thread. It simply queues the
170        stop signal. If the main process exits before the background thread
171        processes the stop signal, it will be terminated without finishing
172        work. The ``grace_period`` parameter will give the background
173        thread some time to finish processing before this function returns.
174
175        Args:
176            grace_period (Optional[float]): If specified, this method will
177                block up to this many seconds to allow the background thread
178                to finish work before returning.
179
180        Returns:
181            bool: True if the thread terminated. False if the thread is still
182            running.
183        """
184        if not self.is_alive:
185            return True
186
187        with self._operational_lock:
188            self._queue.put_nowait(_WORKER_TERMINATOR)
189
190            if grace_period is not None:
191                print("Waiting up to %d seconds." % (grace_period,), file=sys.stderr)
192
193            self._thread.join(timeout=grace_period)
194
195            # Check this before disowning the thread, because after we disown
196            # the thread is_alive will be False regardless of if the thread
197            # exited or not.
198            success = not self.is_alive
199
200            self._thread = None
201
202            return success
203
204    def _main_thread_terminated(self):
205        """Callback that attempts to send pending logs before termination."""
206        if not self.is_alive:
207            return
208
209        if not self._queue.empty():
210            print(
211                "Program shutting down, attempting to send %d queued log "
212                "entries to Cloud Logging..." % (self._queue.qsize(),),
213                file=sys.stderr,
214            )
215
216        if self.stop(grace_period=self._grace_period):
217            print("Sent all pending logs.", file=sys.stderr)
218        else:
219            print(
220                "Failed to send %d pending logs." % (self._queue.qsize(),),
221                file=sys.stderr,
222            )
223
224    def enqueue(self, record, message, **kwargs):
225        """Queues a log entry to be written by the background thread.
226
227        Args:
228            record (logging.LogRecord): Python log record that the handler was called with.
229            message (str): The message from the ``LogRecord`` after being
230                        formatted by the associated log formatters.
231            kwargs: Additional optional arguments for the logger
232        """
233        queue_entry = {
234            "info": {"message": message, "python_logger": record.name},
235            "severity": _helpers._normalize_severity(record.levelno),
236            "timestamp": datetime.datetime.utcfromtimestamp(record.created),
237        }
238        queue_entry.update(kwargs)
239        self._queue.put_nowait(queue_entry)
240
241    def flush(self):
242        """Submit any pending log records."""
243        self._queue.join()
244
245
246class BackgroundThreadTransport(Transport):
247    """Asynchronous transport that uses a background thread."""
248
249    def __init__(
250        self,
251        client,
252        name,
253        *,
254        grace_period=_DEFAULT_GRACE_PERIOD,
255        batch_size=_DEFAULT_MAX_BATCH_SIZE,
256        max_latency=_DEFAULT_MAX_LATENCY,
257    ):
258        """
259        Args:
260            client (~logging_v2.client.Client):
261                The Logging client.
262            name (str): The name of the lgoger.
263            grace_period (Optional[float]): The amount of time to wait for pending logs to
264                be submitted when the process is shutting down.
265            batch_size (Optional[int]): The maximum number of items to send at a time in the
266                background thread.
267            max_latency (Optional[float]): The amount of time to wait for new logs before
268                sending a new batch. It is strongly recommended to keep this smaller
269                than the grace_period. This means this is effectively the longest
270                amount of time the background thread will hold onto log entries
271                before sending them to the server.
272        """
273        self.client = client
274        logger = self.client.logger(name)
275        self.worker = _Worker(
276            logger,
277            grace_period=grace_period,
278            max_batch_size=batch_size,
279            max_latency=max_latency,
280        )
281        self.worker.start()
282
283    def send(self, record, message, **kwargs):
284        """Overrides Transport.send().
285
286        Args:
287            record (logging.LogRecord): Python log record that the handler was called with.
288            message (str): The message from the ``LogRecord`` after being
289                formatted by the associated log formatters.
290            kwargs: Additional optional arguments for the logger
291        """
292        self.worker.enqueue(record, message, **kwargs)
293
294    def flush(self):
295        """Submit any pending log records."""
296        self.worker.flush()
297