1 /* 2 * Copyright (c) 2015 Balabit 3 * Copyright (c) 2015 Tibor Benke 4 * 5 * This program is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 as published 7 * by the Free Software Foundation, or (at your option) any later version. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * GNU General Public License for more details. 13 * 14 * You should have received a copy of the GNU General Public License 15 * along with this program; if not, write to the Free Software 16 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 17 * 18 * As an additional exemption you are allowed to compile & link against the 19 * OpenSSL libraries as published by the OpenSSL project. See the file 20 * COPYING for details. 21 * 22 */ 23 24 package org.syslog_ng.kafka; 25 26 import org.apache.kafka.clients.producer.Callback; 27 import org.apache.kafka.clients.producer.KafkaProducer; 28 import org.apache.kafka.clients.producer.ProducerRecord; 29 import org.apache.kafka.clients.producer.RecordMetadata; 30 import org.apache.log4j.Logger; 31 import org.syslog_ng.LogMessage; 32 import org.syslog_ng.StructuredLogDestination; 33 import org.syslog_ng.logging.SyslogNgInternalLogger; 34 import org.syslog_ng.options.InvalidOptionException; 35 import org.syslog_ng.kafka.KafkaDestinationOptions; 36 37 import java.util.concurrent.ExecutionException; 38 import java.util.concurrent.Future; 39 40 public class KafkaDestination extends StructuredLogDestination { 41 private KafkaProducer<String, String> producer; 42 private KafkaDestinationProperties properties; 43 private boolean isOpened = false; 44 private Logger logger; 45 private KafkaDestinationOptions options; 46 KafkaDestination(long handle)47 public KafkaDestination(long handle) { 48 super(handle); 49 50 Logger.getLogger("org.apache.kafka").setLevel(SyslogNgInternalLogger.getLevel()); 51 logger = Logger.getRootLogger(); 52 SyslogNgInternalLogger.register(logger); 53 54 options = new KafkaDestinationOptions(this); 55 properties = new KafkaDestinationProperties(options); 56 } 57 58 @Override getNameByUniqOptions()59 public String getNameByUniqOptions() { 60 return String.format("KafkaDestination,%s,%s", options.getKafkaBootstrapServers(), options.getTopic().getValue()); 61 } 62 63 @Override send(LogMessage logMessage)64 public int send(LogMessage logMessage) { 65 boolean result; 66 67 String formattedKey = options.getKey().getResolvedString(logMessage); 68 String formattedTopic = options.getTopic().getResolvedString(logMessage); 69 String formattedMessage = options.getTemplate().getResolvedString(logMessage); 70 71 ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(formattedTopic, formattedKey, formattedMessage); 72 73 logger.debug("Outgoing message: " + producerRecord.toString()); 74 if (options.getSyncSend()) { 75 result = sendSynchronously(producerRecord); 76 } else 77 result = sendAsynchronously(producerRecord); 78 79 if (result) 80 return SUCCESS; 81 else 82 return ERROR; 83 } 84 sendSynchronously(ProducerRecord<String, String> producerRecord)85 private boolean sendSynchronously(ProducerRecord<String, String> producerRecord) { 86 Future<RecordMetadata> future = producer.send(producerRecord); 87 return waitForReceive(future); 88 } 89 sendAsynchronously(ProducerRecord<String, String> producerRecord)90 private boolean sendAsynchronously(ProducerRecord<String, String> producerRecord) { 91 Callback callback = new Callback() { 92 public void onCompletion(RecordMetadata metadata, Exception e) { 93 if(e != null) { 94 logger.error(e.getMessage()); 95 } 96 } 97 }; 98 99 producer.send(producerRecord, callback); 100 return true; 101 } 102 waitForReceive(Future<RecordMetadata> future)103 private boolean waitForReceive(Future<RecordMetadata> future) { 104 try { 105 future.get(); 106 return true; 107 } catch (InterruptedException e) { 108 logSendError(e); 109 } catch (ExecutionException e) { 110 logSendError(e); 111 } 112 113 return false; 114 } 115 logSendError(Exception e)116 private void logSendError(Exception e) { 117 logger.debug("Unable to send the message at the moment"); 118 logger.debug(e.getMessage()); 119 } 120 121 @Override open()122 public boolean open() { 123 logger.debug("opening connection"); 124 producer = new KafkaProducer<String,String>(properties.getProperties()); 125 isOpened = true; 126 return true; 127 } 128 129 @Override close()130 public void close() { 131 logger.debug("closing connection"); 132 producer.close(); 133 isOpened = false; 134 } 135 136 @Override isOpened()137 public boolean isOpened() { 138 return isOpened; 139 } 140 141 @Override init()142 public boolean init() { 143 logger.debug("initializing"); 144 try { 145 options.init(); 146 properties.init(); 147 return true; 148 } catch (InvalidOptionException e) { 149 logger.error(e.getMessage()); 150 return false; 151 } 152 } 153 154 @Override deinit()155 public void deinit() { 156 logger.debug("deinitializing"); 157 options.deinit(); 158 } 159 } 160