1 /*
2  * Copyright (c) 2015 Balabit
3  * Copyright (c) 2015 Tibor Benke
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17  *
18  * As an additional exemption you are allowed to compile & link against the
19  * OpenSSL libraries as published by the OpenSSL project. See the file
20  * COPYING for details.
21  *
22  */
23 
24 package org.syslog_ng.kafka;
25 
26 import org.apache.kafka.clients.producer.Callback;
27 import org.apache.kafka.clients.producer.KafkaProducer;
28 import org.apache.kafka.clients.producer.ProducerRecord;
29 import org.apache.kafka.clients.producer.RecordMetadata;
30 import org.apache.log4j.Logger;
31 import org.syslog_ng.LogMessage;
32 import org.syslog_ng.StructuredLogDestination;
33 import org.syslog_ng.logging.SyslogNgInternalLogger;
34 import org.syslog_ng.options.InvalidOptionException;
35 import org.syslog_ng.kafka.KafkaDestinationOptions;
36 
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.Future;
39 
40 public class KafkaDestination extends StructuredLogDestination {
41   private KafkaProducer<String, String> producer;
42   private KafkaDestinationProperties properties;
43   private boolean isOpened = false;
44   private Logger logger;
45   private KafkaDestinationOptions options;
46 
KafkaDestination(long handle)47   public KafkaDestination(long handle) {
48     super(handle);
49 
50     Logger.getLogger("org.apache.kafka").setLevel(SyslogNgInternalLogger.getLevel());
51     logger = Logger.getRootLogger();
52     SyslogNgInternalLogger.register(logger);
53 
54     options = new KafkaDestinationOptions(this);
55     properties = new KafkaDestinationProperties(options);
56   }
57 
58     @Override
getNameByUniqOptions()59     public String getNameByUniqOptions() {
60       return String.format("KafkaDestination,%s,%s", options.getKafkaBootstrapServers(), options.getTopic().getValue());
61     }
62 
63   @Override
send(LogMessage logMessage)64   public int send(LogMessage logMessage) {
65     boolean result;
66 
67     String formattedKey = options.getKey().getResolvedString(logMessage);
68     String formattedTopic = options.getTopic().getResolvedString(logMessage);
69     String formattedMessage = options.getTemplate().getResolvedString(logMessage);
70 
71     ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(formattedTopic, formattedKey, formattedMessage);
72 
73     logger.debug("Outgoing message: " + producerRecord.toString());
74     if (options.getSyncSend()) {
75       result =  sendSynchronously(producerRecord);
76     } else
77       result = sendAsynchronously(producerRecord);
78 
79     if (result)
80         return SUCCESS;
81     else
82         return ERROR;
83   }
84 
sendSynchronously(ProducerRecord<String, String> producerRecord)85   private boolean sendSynchronously(ProducerRecord<String, String> producerRecord) {
86     Future<RecordMetadata> future = producer.send(producerRecord);
87     return waitForReceive(future);
88   }
89 
sendAsynchronously(ProducerRecord<String, String> producerRecord)90   private boolean sendAsynchronously(ProducerRecord<String, String> producerRecord) {
91     Callback callback = new Callback() {
92       public void onCompletion(RecordMetadata metadata, Exception e) {
93         if(e != null) {
94           logger.error(e.getMessage());
95         }
96       }
97     };
98 
99     producer.send(producerRecord, callback);
100     return true;
101   }
102 
waitForReceive(Future<RecordMetadata> future)103   private boolean waitForReceive(Future<RecordMetadata> future) {
104     try {
105       future.get();
106       return true;
107     } catch (InterruptedException e) {
108       logSendError(e);
109     } catch (ExecutionException e) {
110       logSendError(e);
111     }
112 
113     return false;
114   }
115 
logSendError(Exception e)116   private void logSendError(Exception e) {
117     logger.debug("Unable to send the message at the moment");
118     logger.debug(e.getMessage());
119   }
120 
121   @Override
open()122   public boolean open() {
123     logger.debug("opening connection");
124     producer = new KafkaProducer<String,String>(properties.getProperties());
125     isOpened = true;
126     return true;
127   }
128 
129   @Override
close()130   public void close() {
131     logger.debug("closing connection");
132     producer.close();
133     isOpened = false;
134   }
135 
136   @Override
isOpened()137   public boolean isOpened() {
138     return isOpened;
139   }
140 
141   @Override
init()142   public boolean init() {
143     logger.debug("initializing");
144     try {
145       options.init();
146       properties.init();
147       return true;
148     } catch (InvalidOptionException e) {
149       logger.error(e.getMessage());
150       return false;
151     }
152   }
153 
154   @Override
deinit()155   public void deinit() {
156     logger.debug("deinitializing");
157     options.deinit();
158   }
159 }
160