1# Copyright 2019 The Matrix.org Foundation C.I.C.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16import warnings
17from io import StringIO
18from unittest.mock import Mock
19
20from pyperf import perf_counter
21
22from twisted.internet.defer import Deferred
23from twisted.internet.protocol import ServerFactory
24from twisted.logger import LogBeginner, LogPublisher
25from twisted.protocols.basic import LineOnlyReceiver
26
27from synapse.config.logger import _setup_stdlib_logging
28from synapse.logging import RemoteHandler
29from synapse.util import Clock
30
31
32class LineCounter(LineOnlyReceiver):
33
34    delimiter = b"\n"
35
36    def __init__(self, *args, **kwargs):
37        self.count = 0
38        super().__init__(*args, **kwargs)
39
40    def lineReceived(self, line):
41        self.count += 1
42
43        if self.count >= self.factory.wait_for and self.factory.on_done:
44            on_done = self.factory.on_done
45            self.factory.on_done = None
46            on_done.callback(True)
47
48
49async def main(reactor, loops):
50    """
51    Benchmark how long it takes to send `loops` messages.
52    """
53    servers = []
54
55    def protocol():
56        p = LineCounter()
57        servers.append(p)
58        return p
59
60    logger_factory = ServerFactory.forProtocol(protocol)
61    logger_factory.wait_for = loops
62    logger_factory.on_done = Deferred()
63    port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1")
64
65    # A fake homeserver config.
66    class Config:
67        server_name = "synmark-" + str(loops)
68        no_redirect_stdio = True
69
70    hs_config = Config()
71
72    # To be able to sleep.
73    clock = Clock(reactor)
74
75    errors = StringIO()
76    publisher = LogPublisher()
77    mock_sys = Mock()
78    beginner = LogBeginner(
79        publisher, errors, mock_sys, warnings, initialBufferSize=loops
80    )
81
82    log_config = {
83        "version": 1,
84        "loggers": {"synapse": {"level": "DEBUG", "handlers": ["tersejson"]}},
85        "formatters": {"tersejson": {"class": "synapse.logging.TerseJsonFormatter"}},
86        "handlers": {
87            "tersejson": {
88                "class": "synapse.logging.RemoteHandler",
89                "host": "127.0.0.1",
90                "port": port.getHost().port,
91                "maximum_buffer": 100,
92                "_reactor": reactor,
93            }
94        },
95    }
96
97    logger = logging.getLogger("synapse.logging.test_terse_json")
98    _setup_stdlib_logging(
99        hs_config,
100        log_config,
101        logBeginner=beginner,
102    )
103
104    # Wait for it to connect...
105    for handler in logging.getLogger("synapse").handlers:
106        if isinstance(handler, RemoteHandler):
107            break
108    else:
109        raise RuntimeError("Improperly configured: no RemoteHandler found.")
110
111    await handler._service.whenConnected()
112
113    start = perf_counter()
114
115    # Send a bunch of useful messages
116    for i in range(0, loops):
117        logger.info("test message %s", i)
118
119        if len(handler._buffer) == handler.maximum_buffer:
120            while len(handler._buffer) > handler.maximum_buffer / 2:
121                await clock.sleep(0.01)
122
123    await logger_factory.on_done
124
125    end = perf_counter() - start
126
127    handler.close()
128    port.stopListening()
129
130    return end
131