1# Copyright 2019 The Matrix.org Foundation C.I.C. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import logging 16import warnings 17from io import StringIO 18from unittest.mock import Mock 19 20from pyperf import perf_counter 21 22from twisted.internet.defer import Deferred 23from twisted.internet.protocol import ServerFactory 24from twisted.logger import LogBeginner, LogPublisher 25from twisted.protocols.basic import LineOnlyReceiver 26 27from synapse.config.logger import _setup_stdlib_logging 28from synapse.logging import RemoteHandler 29from synapse.util import Clock 30 31 32class LineCounter(LineOnlyReceiver): 33 34 delimiter = b"\n" 35 36 def __init__(self, *args, **kwargs): 37 self.count = 0 38 super().__init__(*args, **kwargs) 39 40 def lineReceived(self, line): 41 self.count += 1 42 43 if self.count >= self.factory.wait_for and self.factory.on_done: 44 on_done = self.factory.on_done 45 self.factory.on_done = None 46 on_done.callback(True) 47 48 49async def main(reactor, loops): 50 """ 51 Benchmark how long it takes to send `loops` messages. 52 """ 53 servers = [] 54 55 def protocol(): 56 p = LineCounter() 57 servers.append(p) 58 return p 59 60 logger_factory = ServerFactory.forProtocol(protocol) 61 logger_factory.wait_for = loops 62 logger_factory.on_done = Deferred() 63 port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1") 64 65 # A fake homeserver config. 66 class Config: 67 server_name = "synmark-" + str(loops) 68 no_redirect_stdio = True 69 70 hs_config = Config() 71 72 # To be able to sleep. 73 clock = Clock(reactor) 74 75 errors = StringIO() 76 publisher = LogPublisher() 77 mock_sys = Mock() 78 beginner = LogBeginner( 79 publisher, errors, mock_sys, warnings, initialBufferSize=loops 80 ) 81 82 log_config = { 83 "version": 1, 84 "loggers": {"synapse": {"level": "DEBUG", "handlers": ["tersejson"]}}, 85 "formatters": {"tersejson": {"class": "synapse.logging.TerseJsonFormatter"}}, 86 "handlers": { 87 "tersejson": { 88 "class": "synapse.logging.RemoteHandler", 89 "host": "127.0.0.1", 90 "port": port.getHost().port, 91 "maximum_buffer": 100, 92 "_reactor": reactor, 93 } 94 }, 95 } 96 97 logger = logging.getLogger("synapse.logging.test_terse_json") 98 _setup_stdlib_logging( 99 hs_config, 100 log_config, 101 logBeginner=beginner, 102 ) 103 104 # Wait for it to connect... 105 for handler in logging.getLogger("synapse").handlers: 106 if isinstance(handler, RemoteHandler): 107 break 108 else: 109 raise RuntimeError("Improperly configured: no RemoteHandler found.") 110 111 await handler._service.whenConnected() 112 113 start = perf_counter() 114 115 # Send a bunch of useful messages 116 for i in range(0, loops): 117 logger.info("test message %s", i) 118 119 if len(handler._buffer) == handler.maximum_buffer: 120 while len(handler._buffer) > handler.maximum_buffer / 2: 121 await clock.sleep(0.01) 122 123 await logger_factory.on_done 124 125 end = perf_counter() - start 126 127 handler.close() 128 port.stopListening() 129 130 return end 131