1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.streaming.receiver 19 20import java.nio.ByteBuffer 21 22import scala.collection.mutable.ArrayBuffer 23import scala.collection.JavaConverters._ 24 25import org.apache.spark.annotation.DeveloperApi 26import org.apache.spark.storage.StorageLevel 27 28/** 29 * :: DeveloperApi :: 30 * Abstract class of a receiver that can be run on worker nodes to receive external data. A 31 * custom receiver can be defined by defining the functions `onStart()` and `onStop()`. `onStart()` 32 * should define the setup steps necessary to start receiving data, 33 * and `onStop()` should define the cleanup steps necessary to stop receiving data. 34 * Exceptions while receiving can be handled either by restarting the receiver with `restart(...)` 35 * or stopped completely by `stop(...)`. 36 * 37 * A custom receiver in Scala would look like this. 38 * 39 * {{{ 40 * class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) { 41 * def onStart() { 42 * // Setup stuff (start threads, open sockets, etc.) to start receiving data. 43 * // Must start new thread to receive data, as onStart() must be non-blocking. 44 * 45 * // Call store(...) in those threads to store received data into Spark's memory. 46 * 47 * // Call stop(...), restart(...) or reportError(...) on any thread based on how 48 * // different errors need to be handled. 49 * 50 * // See corresponding method documentation for more details 51 * } 52 * 53 * def onStop() { 54 * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. 55 * } 56 * } 57 * }}} 58 * 59 * A custom receiver in Java would look like this. 60 * 61 * {{{ 62 * class MyReceiver extends Receiver<String> { 63 * public MyReceiver(StorageLevel storageLevel) { 64 * super(storageLevel); 65 * } 66 * 67 * public void onStart() { 68 * // Setup stuff (start threads, open sockets, etc.) to start receiving data. 69 * // Must start new thread to receive data, as onStart() must be non-blocking. 70 * 71 * // Call store(...) in those threads to store received data into Spark's memory. 72 * 73 * // Call stop(...), restart(...) or reportError(...) on any thread based on how 74 * // different errors need to be handled. 75 * 76 * // See corresponding method documentation for more details 77 * } 78 * 79 * public void onStop() { 80 * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. 81 * } 82 * } 83 * }}} 84 */ 85@DeveloperApi 86abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable { 87 88 /** 89 * This method is called by the system when the receiver is started. This function 90 * must initialize all resources (threads, buffers, etc.) necessary for receiving data. 91 * This function must be non-blocking, so receiving the data must occur on a different 92 * thread. Received data can be stored with Spark by calling `store(data)`. 93 * 94 * If there are errors in threads started here, then following options can be done 95 * (i) `reportError(...)` can be called to report the error to the driver. 96 * The receiving of data will continue uninterrupted. 97 * (ii) `stop(...)` can be called to stop receiving data. This will call `onStop()` to 98 * clear up all resources allocated (threads, buffers, etc.) during `onStart()`. 99 * (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()` 100 * immediately, and then `onStart()` after a delay. 101 */ 102 def onStart(): Unit 103 104 /** 105 * This method is called by the system when the receiver is stopped. All resources 106 * (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this method. 107 */ 108 def onStop(): Unit 109 110 /** Override this to specify a preferred location (hostname). */ 111 def preferredLocation: Option[String] = None 112 113 /** 114 * Store a single item of received data to Spark's memory. 115 * These single items will be aggregated together into data blocks before 116 * being pushed into Spark's memory. 117 */ 118 def store(dataItem: T) { 119 supervisor.pushSingle(dataItem) 120 } 121 122 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ 123 def store(dataBuffer: ArrayBuffer[T]) { 124 supervisor.pushArrayBuffer(dataBuffer, None, None) 125 } 126 127 /** 128 * Store an ArrayBuffer of received data as a data block into Spark's memory. 129 * The metadata will be associated with this block of data 130 * for being used in the corresponding InputDStream. 131 */ 132 def store(dataBuffer: ArrayBuffer[T], metadata: Any) { 133 supervisor.pushArrayBuffer(dataBuffer, Some(metadata), None) 134 } 135 136 /** Store an iterator of received data as a data block into Spark's memory. */ 137 def store(dataIterator: Iterator[T]) { 138 supervisor.pushIterator(dataIterator, None, None) 139 } 140 141 /** 142 * Store an iterator of received data as a data block into Spark's memory. 143 * The metadata will be associated with this block of data 144 * for being used in the corresponding InputDStream. 145 */ 146 def store(dataIterator: java.util.Iterator[T], metadata: Any) { 147 supervisor.pushIterator(dataIterator.asScala, Some(metadata), None) 148 } 149 150 /** Store an iterator of received data as a data block into Spark's memory. */ 151 def store(dataIterator: java.util.Iterator[T]) { 152 supervisor.pushIterator(dataIterator.asScala, None, None) 153 } 154 155 /** 156 * Store an iterator of received data as a data block into Spark's memory. 157 * The metadata will be associated with this block of data 158 * for being used in the corresponding InputDStream. 159 */ 160 def store(dataIterator: Iterator[T], metadata: Any) { 161 supervisor.pushIterator(dataIterator, Some(metadata), None) 162 } 163 164 /** 165 * Store the bytes of received data as a data block into Spark's memory. Note 166 * that the data in the ByteBuffer must be serialized using the same serializer 167 * that Spark is configured to use. 168 */ 169 def store(bytes: ByteBuffer) { 170 supervisor.pushBytes(bytes, None, None) 171 } 172 173 /** 174 * Store the bytes of received data as a data block into Spark's memory. 175 * The metadata will be associated with this block of data 176 * for being used in the corresponding InputDStream. 177 */ 178 def store(bytes: ByteBuffer, metadata: Any) { 179 supervisor.pushBytes(bytes, Some(metadata), None) 180 } 181 182 /** Report exceptions in receiving data. */ 183 def reportError(message: String, throwable: Throwable) { 184 supervisor.reportError(message, throwable) 185 } 186 187 /** 188 * Restart the receiver. This method schedules the restart and returns 189 * immediately. The stopping and subsequent starting of the receiver 190 * (by calling `onStop()` and `onStart()`) is performed asynchronously 191 * in a background thread. The delay between the stopping and the starting 192 * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`. 193 * The `message` will be reported to the driver. 194 */ 195 def restart(message: String) { 196 supervisor.restartReceiver(message) 197 } 198 199 /** 200 * Restart the receiver. This method schedules the restart and returns 201 * immediately. The stopping and subsequent starting of the receiver 202 * (by calling `onStop()` and `onStart()`) is performed asynchronously 203 * in a background thread. The delay between the stopping and the starting 204 * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`. 205 * The `message` and `exception` will be reported to the driver. 206 */ 207 def restart(message: String, error: Throwable) { 208 supervisor.restartReceiver(message, Some(error)) 209 } 210 211 /** 212 * Restart the receiver. This method schedules the restart and returns 213 * immediately. The stopping and subsequent starting of the receiver 214 * (by calling `onStop()` and `onStart()`) is performed asynchronously 215 * in a background thread. 216 */ 217 def restart(message: String, error: Throwable, millisecond: Int) { 218 supervisor.restartReceiver(message, Some(error), millisecond) 219 } 220 221 /** Stop the receiver completely. */ 222 def stop(message: String) { 223 supervisor.stop(message, None) 224 } 225 226 /** Stop the receiver completely due to an exception */ 227 def stop(message: String, error: Throwable) { 228 supervisor.stop(message, Some(error)) 229 } 230 231 /** Check if the receiver has started or not. */ 232 def isStarted(): Boolean = { 233 supervisor.isReceiverStarted() 234 } 235 236 /** 237 * Check if receiver has been marked for stopping. Use this to identify when 238 * the receiving of data should be stopped. 239 */ 240 def isStopped(): Boolean = { 241 supervisor.isReceiverStopped() 242 } 243 244 /** 245 * Get the unique identifier the receiver input stream that this 246 * receiver is associated with. 247 */ 248 def streamId: Int = id 249 250 /* 251 * ================= 252 * Private methods 253 * ================= 254 */ 255 256 /** Identifier of the stream this receiver is associated with. */ 257 private var id: Int = -1 258 259 /** Handler object that runs the receiver. This is instantiated lazily in the worker. */ 260 @transient private var _supervisor: ReceiverSupervisor = null 261 262 /** Set the ID of the DStream that this receiver is associated with. */ 263 private[streaming] def setReceiverId(_id: Int) { 264 id = _id 265 } 266 267 /** Attach Network Receiver executor to this receiver. */ 268 private[streaming] def attachSupervisor(exec: ReceiverSupervisor) { 269 assert(_supervisor == null) 270 _supervisor = exec 271 } 272 273 /** Get the attached supervisor. */ 274 private[streaming] def supervisor: ReceiverSupervisor = { 275 assert(_supervisor != null, 276 "A ReceiverSupervisor has not been attached to the receiver yet. Maybe you are starting " + 277 "some computation in the receiver before the Receiver.onStart() has been called.") 278 _supervisor 279 } 280} 281 282