1<?php 2/** 3 * This program is free software; you can redistribute it and/or modify 4 * it under the terms of the GNU General Public License as published by 5 * the Free Software Foundation; either version 2 of the License, or 6 * (at your option) any later version. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public License along 14 * with this program; if not, write to the Free Software Foundation, Inc., 15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 16 * http://www.gnu.org/copyleft/gpl.html 17 * 18 * @file 19 */ 20 21namespace MediaWiki\Logger\Monolog; 22 23use Kafka\MetaDataFromKafka; 24use Kafka\Produce; 25use Kafka\Protocol\Decoder; 26use MediaWiki\Logger\LoggerFactory; 27use Monolog\Handler\AbstractProcessingHandler; 28use Monolog\Logger; 29use Psr\Log\LoggerInterface; 30 31/** 32 * Log handler sends log events to a kafka server. 33 * 34 * Constructor options array arguments: 35 * * alias: map from monolog channel to kafka topic name. When no 36 * alias exists the topic "monolog_$channel" will be used. 37 * * swallowExceptions: Swallow exceptions that occur while talking to 38 * kafka. Defaults to false. 39 * * logExceptions: Log exceptions talking to kafka here. Either null, 40 * the name of a channel to log to, or an object implementing 41 * FormatterInterface. Defaults to null. 42 * 43 * Requires the nmred/kafka-php library, version >= 1.3.0 44 * 45 * @since 1.26 46 * @author Erik Bernhardson <ebernhardson@wikimedia.org> 47 * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation. 48 */ 49class KafkaHandler extends AbstractProcessingHandler { 50 /** 51 * @var Produce Sends requests to kafka 52 */ 53 protected $produce; 54 55 /** 56 * @var array Optional handler configuration 57 */ 58 protected $options; 59 60 /** 61 * @var array Map from topic name to partition this request produces to 62 */ 63 protected $partitions = []; 64 65 /** 66 * @var array defaults for constructor options 67 */ 68 private const DEFAULT_OPTIONS = [ 69 'alias' => [], // map from monolog channel to kafka topic 70 'swallowExceptions' => false, // swallow exceptions sending records 71 'logExceptions' => null, // A PSR3 logger to inform about errors 72 'requireAck' => 0, 73 ]; 74 75 /** 76 * @param Produce $produce Kafka instance to produce through 77 * @param array $options optional handler configuration 78 * @param int $level The minimum logging level at which this handler will be triggered 79 * @param bool $bubble Whether the messages that are handled can bubble up the stack or not 80 */ 81 public function __construct( 82 Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true 83 ) { 84 parent::__construct( $level, $bubble ); 85 $this->produce = $produce; 86 $this->options = array_merge( self::DEFAULT_OPTIONS, $options ); 87 } 88 89 /** 90 * Constructs the necessary support objects and returns a KafkaHandler 91 * instance. 92 * 93 * @param string[] $kafkaServers 94 * @param array $options 95 * @param int $level The minimum logging level at which this handle will be triggered 96 * @param bool $bubble Whether the messages that are handled can bubble the stack or not 97 * @return KafkaHandler 98 */ 99 public static function factory( 100 $kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true 101 ) { 102 $metadata = new MetaDataFromKafka( $kafkaServers ); 103 $produce = new Produce( $metadata ); 104 105 if ( isset( $options['sendTimeout'] ) ) { 106 $timeOut = $options['sendTimeout']; 107 $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 ); 108 $produce->getClient()->setStreamOption( 'SendTimeoutUSec', 109 intval( $timeOut * 1000000 ) 110 ); 111 } 112 if ( isset( $options['recvTimeout'] ) ) { 113 $timeOut = $options['recvTimeout']; 114 $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 ); 115 $produce->getClient()->setStreamOption( 'RecvTimeoutUSec', 116 intval( $timeOut * 1000000 ) 117 ); 118 } 119 if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) { 120 $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] ); 121 } 122 123 if ( isset( $options['requireAck'] ) ) { 124 $produce->setRequireAck( $options['requireAck'] ); 125 } 126 127 return new self( $produce, $options, $level, $bubble ); 128 } 129 130 /** 131 * @inheritDoc 132 */ 133 protected function write( array $record ): void { 134 if ( $record['formatted'] !== null ) { 135 $this->addMessages( $record['channel'], [ $record['formatted'] ] ); 136 $this->send(); 137 } 138 } 139 140 /** 141 * @inheritDoc 142 * @phan-param array[] $batch 143 */ 144 public function handleBatch( array $batch ): void { 145 $channels = []; 146 foreach ( $batch as $record ) { 147 if ( $record['level'] < $this->level ) { 148 continue; 149 } 150 $channels[$record['channel']][] = $this->processRecord( $record ); 151 } 152 153 $formatter = $this->getFormatter(); 154 foreach ( $channels as $channel => $records ) { 155 $messages = []; 156 foreach ( $records as $idx => $record ) { 157 $message = $formatter->format( $record ); 158 if ( $message !== null ) { 159 $messages[] = $message; 160 } 161 } 162 if ( $messages ) { 163 $this->addMessages( $channel, $messages ); 164 } 165 } 166 167 $this->send(); 168 } 169 170 /** 171 * Send any records in the kafka client internal queue. 172 */ 173 protected function send() { 174 try { 175 $response = $this->produce->send(); 176 } catch ( \Kafka\Exception $e ) { 177 $ignore = $this->warning( 178 'Error sending records to kafka: {exception}', 179 [ 'exception' => $e ] ); 180 if ( !$ignore ) { 181 throw $e; 182 } else { 183 return; 184 } 185 } 186 187 if ( is_bool( $response ) ) { 188 return; 189 } 190 191 $errors = []; 192 foreach ( $response as $topicName => $partitionResponse ) { 193 foreach ( $partitionResponse as $partition => $info ) { 194 if ( $info['errCode'] === 0 ) { 195 // no error 196 continue; 197 } 198 $errors[] = sprintf( 199 'Error producing to %s (errno %d): %s', 200 $topicName, 201 $info['errCode'], 202 Decoder::getError( $info['errCode'] ) 203 ); 204 } 205 } 206 207 if ( $errors ) { 208 $error = implode( "\n", $errors ); 209 if ( !$this->warning( $error ) ) { 210 throw new \RuntimeException( $error ); 211 } 212 } 213 } 214 215 /** 216 * @param string $topic Name of topic to get partition for 217 * @return int|null The random partition to produce to for this request, 218 * or null if a partition could not be determined. 219 */ 220 protected function getRandomPartition( $topic ) { 221 if ( !array_key_exists( $topic, $this->partitions ) ) { 222 try { 223 $partitions = $this->produce->getAvailablePartitions( $topic ); 224 } catch ( \Kafka\Exception $e ) { 225 $ignore = $this->warning( 226 'Error getting metadata for kafka topic {topic}: {exception}', 227 [ 'topic' => $topic, 'exception' => $e ] ); 228 if ( $ignore ) { 229 return null; 230 } 231 throw $e; 232 } 233 if ( $partitions ) { 234 $key = array_rand( $partitions ); 235 $this->partitions[$topic] = $partitions[$key]; 236 } else { 237 $details = $this->produce->getClient()->getTopicDetail( $topic ); 238 $ignore = $this->warning( 239 'No partitions available for kafka topic {topic}', 240 [ 'topic' => $topic, 'kafka' => $details ] 241 ); 242 if ( !$ignore ) { 243 throw new \RuntimeException( "No partitions available for kafka topic $topic" ); 244 } 245 $this->partitions[$topic] = null; 246 } 247 } 248 return $this->partitions[$topic]; 249 } 250 251 /** 252 * Adds records for a channel to the Kafka client internal queue. 253 * 254 * @param string $channel Name of Monolog channel records belong to 255 * @param array $records List of records to append 256 */ 257 protected function addMessages( $channel, array $records ) { 258 $topic = $this->options['alias'][$channel] ?? "monolog_$channel"; 259 $partition = $this->getRandomPartition( $topic ); 260 if ( $partition !== null ) { 261 $this->produce->setMessages( $topic, $partition, $records ); 262 } 263 } 264 265 /** 266 * @param string $message PSR3 compatible message string 267 * @param array $context PSR3 compatible log context 268 * @return bool true if caller should ignore warning 269 */ 270 protected function warning( $message, array $context = [] ) { 271 if ( $this->options['logExceptions'] instanceof LoggerInterface ) { 272 $this->options['logExceptions']->warning( $message, $context ); 273 } 274 return $this->options['swallowExceptions']; 275 } 276} 277