1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20
21namespace MediaWiki\Logger\Monolog;
22
23use Kafka\MetaDataFromKafka;
24use Kafka\Produce;
25use Kafka\Protocol\Decoder;
26use MediaWiki\Logger\LoggerFactory;
27use Monolog\Handler\AbstractProcessingHandler;
28use Monolog\Logger;
29use Psr\Log\LoggerInterface;
30
31/**
32 * Log handler sends log events to a kafka server.
33 *
34 * Constructor options array arguments:
35 * * alias: map from monolog channel to kafka topic name. When no
36 *   alias exists the topic "monolog_$channel" will be used.
37 * * swallowExceptions: Swallow exceptions that occur while talking to
38 *   kafka. Defaults to false.
39 * * logExceptions: Log exceptions talking to kafka here. Either null,
40 *   the name of a channel to log to, or an object implementing
41 *   FormatterInterface. Defaults to null.
42 *
43 * Requires the nmred/kafka-php library, version >= 1.3.0
44 *
45 * @since 1.26
46 * @author Erik Bernhardson <ebernhardson@wikimedia.org>
47 * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
48 */
49class KafkaHandler extends AbstractProcessingHandler {
50	/**
51	 * @var Produce Sends requests to kafka
52	 */
53	protected $produce;
54
55	/**
56	 * @var array Optional handler configuration
57	 */
58	protected $options;
59
60	/**
61	 * @var array Map from topic name to partition this request produces to
62	 */
63	protected $partitions = [];
64
65	/**
66	 * @var array defaults for constructor options
67	 */
68	private const DEFAULT_OPTIONS = [
69		'alias' => [], // map from monolog channel to kafka topic
70		'swallowExceptions' => false, // swallow exceptions sending records
71		'logExceptions' => null, // A PSR3 logger to inform about errors
72		'requireAck' => 0,
73	];
74
75	/**
76	 * @param Produce $produce Kafka instance to produce through
77	 * @param array $options optional handler configuration
78	 * @param int $level The minimum logging level at which this handler will be triggered
79	 * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
80	 */
81	public function __construct(
82		Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
83	) {
84		parent::__construct( $level, $bubble );
85		$this->produce = $produce;
86		$this->options = array_merge( self::DEFAULT_OPTIONS, $options );
87	}
88
89	/**
90	 * Constructs the necessary support objects and returns a KafkaHandler
91	 * instance.
92	 *
93	 * @param string[] $kafkaServers
94	 * @param array $options
95	 * @param int $level The minimum logging level at which this handle will be triggered
96	 * @param bool $bubble Whether the messages that are handled can bubble the stack or not
97	 * @return KafkaHandler
98	 */
99	public static function factory(
100		$kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
101	) {
102		$metadata = new MetaDataFromKafka( $kafkaServers );
103		$produce = new Produce( $metadata );
104
105		if ( isset( $options['sendTimeout'] ) ) {
106			$timeOut = $options['sendTimeout'];
107			$produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
108			$produce->getClient()->setStreamOption( 'SendTimeoutUSec',
109				intval( $timeOut * 1000000 )
110			);
111		}
112		if ( isset( $options['recvTimeout'] ) ) {
113			$timeOut = $options['recvTimeout'];
114			$produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
115			$produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
116				intval( $timeOut * 1000000 )
117			);
118		}
119		if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
120			$options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
121		}
122
123		if ( isset( $options['requireAck'] ) ) {
124			$produce->setRequireAck( $options['requireAck'] );
125		}
126
127		return new self( $produce, $options, $level, $bubble );
128	}
129
130	/**
131	 * @inheritDoc
132	 */
133	protected function write( array $record ): void {
134		if ( $record['formatted'] !== null ) {
135			$this->addMessages( $record['channel'], [ $record['formatted'] ] );
136			$this->send();
137		}
138	}
139
140	/**
141	 * @inheritDoc
142	 * @phan-param array[] $batch
143	 */
144	public function handleBatch( array $batch ): void {
145		$channels = [];
146		foreach ( $batch as $record ) {
147			if ( $record['level'] < $this->level ) {
148				continue;
149			}
150			$channels[$record['channel']][] = $this->processRecord( $record );
151		}
152
153		$formatter = $this->getFormatter();
154		foreach ( $channels as $channel => $records ) {
155			$messages = [];
156			foreach ( $records as $idx => $record ) {
157				$message = $formatter->format( $record );
158				if ( $message !== null ) {
159					$messages[] = $message;
160				}
161			}
162			if ( $messages ) {
163				$this->addMessages( $channel, $messages );
164			}
165		}
166
167		$this->send();
168	}
169
170	/**
171	 * Send any records in the kafka client internal queue.
172	 */
173	protected function send() {
174		try {
175			$response = $this->produce->send();
176		} catch ( \Kafka\Exception $e ) {
177			$ignore = $this->warning(
178				'Error sending records to kafka: {exception}',
179				[ 'exception' => $e ] );
180			if ( !$ignore ) {
181				throw $e;
182			} else {
183				return;
184			}
185		}
186
187		if ( is_bool( $response ) ) {
188			return;
189		}
190
191		$errors = [];
192		foreach ( $response as $topicName => $partitionResponse ) {
193			foreach ( $partitionResponse as $partition => $info ) {
194				if ( $info['errCode'] === 0 ) {
195					// no error
196					continue;
197				}
198				$errors[] = sprintf(
199					'Error producing to %s (errno %d): %s',
200					$topicName,
201					$info['errCode'],
202					Decoder::getError( $info['errCode'] )
203				);
204			}
205		}
206
207		if ( $errors ) {
208			$error = implode( "\n", $errors );
209			if ( !$this->warning( $error ) ) {
210				throw new \RuntimeException( $error );
211			}
212		}
213	}
214
215	/**
216	 * @param string $topic Name of topic to get partition for
217	 * @return int|null The random partition to produce to for this request,
218	 *  or null if a partition could not be determined.
219	 */
220	protected function getRandomPartition( $topic ) {
221		if ( !array_key_exists( $topic, $this->partitions ) ) {
222			try {
223				$partitions = $this->produce->getAvailablePartitions( $topic );
224			} catch ( \Kafka\Exception $e ) {
225				$ignore = $this->warning(
226					'Error getting metadata for kafka topic {topic}: {exception}',
227					[ 'topic' => $topic, 'exception' => $e ] );
228				if ( $ignore ) {
229					return null;
230				}
231				throw $e;
232			}
233			if ( $partitions ) {
234				$key = array_rand( $partitions );
235				$this->partitions[$topic] = $partitions[$key];
236			} else {
237				$details = $this->produce->getClient()->getTopicDetail( $topic );
238				$ignore = $this->warning(
239					'No partitions available for kafka topic {topic}',
240					[ 'topic' => $topic, 'kafka' => $details ]
241				);
242				if ( !$ignore ) {
243					throw new \RuntimeException( "No partitions available for kafka topic $topic" );
244				}
245				$this->partitions[$topic] = null;
246			}
247		}
248		return $this->partitions[$topic];
249	}
250
251	/**
252	 * Adds records for a channel to the Kafka client internal queue.
253	 *
254	 * @param string $channel Name of Monolog channel records belong to
255	 * @param array $records List of records to append
256	 */
257	protected function addMessages( $channel, array $records ) {
258		$topic = $this->options['alias'][$channel] ?? "monolog_$channel";
259		$partition = $this->getRandomPartition( $topic );
260		if ( $partition !== null ) {
261			$this->produce->setMessages( $topic, $partition, $records );
262		}
263	}
264
265	/**
266	 * @param string $message PSR3 compatible message string
267	 * @param array $context PSR3 compatible log context
268	 * @return bool true if caller should ignore warning
269	 */
270	protected function warning( $message, array $context = [] ) {
271		if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
272			$this->options['logExceptions']->warning( $message, $context );
273		}
274		return $this->options['swallowExceptions'];
275	}
276}
277