1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17package org.apache.spark.streaming.flume.sink
18
19import org.slf4j.{Logger, LoggerFactory}
20
21/**
22 * Copy of the org.apache.spark.Logging for being used in the Spark Sink.
23 * The org.apache.spark.Logging is not used so that all of Spark is not brought
24 * in as a dependency.
25 */
26private[sink] trait Logging {
27  // Make the log field transient so that objects with Logging can
28  // be serialized and used on another machine
29  @transient private var _log: Logger = null
30
31  // Method to get or create the logger for this object
32  protected def log: Logger = {
33    if (_log == null) {
34      initializeIfNecessary()
35      var className = this.getClass.getName
36      // Ignore trailing $'s in the class names for Scala objects
37      if (className.endsWith("$")) {
38        className = className.substring(0, className.length - 1)
39      }
40      _log = LoggerFactory.getLogger(className)
41    }
42    _log
43  }
44
45  // Log methods that take only a String
46  protected def logInfo(msg: => String) {
47    if (log.isInfoEnabled) log.info(msg)
48  }
49
50  protected def logDebug(msg: => String) {
51    if (log.isDebugEnabled) log.debug(msg)
52  }
53
54  protected def logTrace(msg: => String) {
55    if (log.isTraceEnabled) log.trace(msg)
56  }
57
58  protected def logWarning(msg: => String) {
59    if (log.isWarnEnabled) log.warn(msg)
60  }
61
62  protected def logError(msg: => String) {
63    if (log.isErrorEnabled) log.error(msg)
64  }
65
66  // Log methods that take Throwables (Exceptions/Errors) too
67  protected def logInfo(msg: => String, throwable: Throwable) {
68    if (log.isInfoEnabled) log.info(msg, throwable)
69  }
70
71  protected def logDebug(msg: => String, throwable: Throwable) {
72    if (log.isDebugEnabled) log.debug(msg, throwable)
73  }
74
75  protected def logTrace(msg: => String, throwable: Throwable) {
76    if (log.isTraceEnabled) log.trace(msg, throwable)
77  }
78
79  protected def logWarning(msg: => String, throwable: Throwable) {
80    if (log.isWarnEnabled) log.warn(msg, throwable)
81  }
82
83  protected def logError(msg: => String, throwable: Throwable) {
84    if (log.isErrorEnabled) log.error(msg, throwable)
85  }
86
87  protected def isTraceEnabled(): Boolean = {
88    log.isTraceEnabled
89  }
90
91  private def initializeIfNecessary() {
92    if (!Logging.initialized) {
93      Logging.initLock.synchronized {
94        if (!Logging.initialized) {
95          initializeLogging()
96        }
97      }
98    }
99  }
100
101  private def initializeLogging() {
102    Logging.initialized = true
103
104    // Force a call into slf4j to initialize it. Avoids this happening from multiple threads
105    // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
106    log
107  }
108}
109
110private[sink] object Logging {
111  @volatile private var initialized = false
112  val initLock = new Object()
113  try {
114    // We use reflection here to handle the case where users remove the
115    // slf4j-to-jul bridge order to route their logs to JUL.
116    // scalastyle:off classforname
117    val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
118    // scalastyle:on classforname
119    bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
120    val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
121    if (!installed) {
122      bridgeClass.getMethod("install").invoke(null)
123    }
124  } catch {
125    case e: ClassNotFoundException => // can't log anything yet so just fail silently
126  }
127}
128