1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.streaming.receiver
19
20import java.nio.ByteBuffer
21
22import scala.collection.mutable.ArrayBuffer
23import scala.collection.JavaConverters._
24
25import org.apache.spark.annotation.DeveloperApi
26import org.apache.spark.storage.StorageLevel
27
28/**
29 * :: DeveloperApi ::
30 * Abstract class of a receiver that can be run on worker nodes to receive external data. A
31 * custom receiver can be defined by defining the functions `onStart()` and `onStop()`. `onStart()`
32 * should define the setup steps necessary to start receiving data,
33 * and `onStop()` should define the cleanup steps necessary to stop receiving data.
34 * Exceptions while receiving can be handled either by restarting the receiver with `restart(...)`
35 * or stopped completely by `stop(...)`.
36 *
37 * A custom receiver in Scala would look like this.
38 *
39 * {{{
40 *  class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) {
41 *      def onStart() {
42 *          // Setup stuff (start threads, open sockets, etc.) to start receiving data.
43 *          // Must start new thread to receive data, as onStart() must be non-blocking.
44 *
45 *          // Call store(...) in those threads to store received data into Spark's memory.
46 *
47 *          // Call stop(...), restart(...) or reportError(...) on any thread based on how
48 *          // different errors need to be handled.
49 *
50 *          // See corresponding method documentation for more details
51 *      }
52 *
53 *      def onStop() {
54 *          // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
55 *      }
56 *  }
57 * }}}
58 *
59 * A custom receiver in Java would look like this.
60 *
61 * {{{
62 * class MyReceiver extends Receiver<String> {
63 *     public MyReceiver(StorageLevel storageLevel) {
64 *         super(storageLevel);
65 *     }
66 *
67 *     public void onStart() {
68 *          // Setup stuff (start threads, open sockets, etc.) to start receiving data.
69 *          // Must start new thread to receive data, as onStart() must be non-blocking.
70 *
71 *          // Call store(...) in those threads to store received data into Spark's memory.
72 *
73 *          // Call stop(...), restart(...) or reportError(...) on any thread based on how
74 *          // different errors need to be handled.
75 *
76 *          // See corresponding method documentation for more details
77 *     }
78 *
79 *     public void onStop() {
80 *          // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
81 *     }
82 * }
83 * }}}
84 */
85@DeveloperApi
86abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
87
88  /**
89   * This method is called by the system when the receiver is started. This function
90   * must initialize all resources (threads, buffers, etc.) necessary for receiving data.
91   * This function must be non-blocking, so receiving the data must occur on a different
92   * thread. Received data can be stored with Spark by calling `store(data)`.
93   *
94   * If there are errors in threads started here, then following options can be done
95   * (i) `reportError(...)` can be called to report the error to the driver.
96   * The receiving of data will continue uninterrupted.
97   * (ii) `stop(...)` can be called to stop receiving data. This will call `onStop()` to
98   * clear up all resources allocated (threads, buffers, etc.) during `onStart()`.
99   * (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`
100   * immediately, and then `onStart()` after a delay.
101   */
102  def onStart(): Unit
103
104  /**
105   * This method is called by the system when the receiver is stopped. All resources
106   * (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this method.
107   */
108  def onStop(): Unit
109
110  /** Override this to specify a preferred location (hostname). */
111  def preferredLocation: Option[String] = None
112
113  /**
114   * Store a single item of received data to Spark's memory.
115   * These single items will be aggregated together into data blocks before
116   * being pushed into Spark's memory.
117   */
118  def store(dataItem: T) {
119    supervisor.pushSingle(dataItem)
120  }
121
122  /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
123  def store(dataBuffer: ArrayBuffer[T]) {
124    supervisor.pushArrayBuffer(dataBuffer, None, None)
125  }
126
127  /**
128   * Store an ArrayBuffer of received data as a data block into Spark's memory.
129   * The metadata will be associated with this block of data
130   * for being used in the corresponding InputDStream.
131   */
132  def store(dataBuffer: ArrayBuffer[T], metadata: Any) {
133    supervisor.pushArrayBuffer(dataBuffer, Some(metadata), None)
134  }
135
136  /** Store an iterator of received data as a data block into Spark's memory. */
137  def store(dataIterator: Iterator[T]) {
138    supervisor.pushIterator(dataIterator, None, None)
139  }
140
141  /**
142   * Store an iterator of received data as a data block into Spark's memory.
143   * The metadata will be associated with this block of data
144   * for being used in the corresponding InputDStream.
145   */
146  def store(dataIterator: java.util.Iterator[T], metadata: Any) {
147    supervisor.pushIterator(dataIterator.asScala, Some(metadata), None)
148  }
149
150  /** Store an iterator of received data as a data block into Spark's memory. */
151  def store(dataIterator: java.util.Iterator[T]) {
152    supervisor.pushIterator(dataIterator.asScala, None, None)
153  }
154
155  /**
156   * Store an iterator of received data as a data block into Spark's memory.
157   * The metadata will be associated with this block of data
158   * for being used in the corresponding InputDStream.
159   */
160  def store(dataIterator: Iterator[T], metadata: Any) {
161    supervisor.pushIterator(dataIterator, Some(metadata), None)
162  }
163
164  /**
165   * Store the bytes of received data as a data block into Spark's memory. Note
166   * that the data in the ByteBuffer must be serialized using the same serializer
167   * that Spark is configured to use.
168   */
169  def store(bytes: ByteBuffer) {
170    supervisor.pushBytes(bytes, None, None)
171  }
172
173  /**
174   * Store the bytes of received data as a data block into Spark's memory.
175   * The metadata will be associated with this block of data
176   * for being used in the corresponding InputDStream.
177   */
178  def store(bytes: ByteBuffer, metadata: Any) {
179    supervisor.pushBytes(bytes, Some(metadata), None)
180  }
181
182  /** Report exceptions in receiving data. */
183  def reportError(message: String, throwable: Throwable) {
184    supervisor.reportError(message, throwable)
185  }
186
187  /**
188   * Restart the receiver. This method schedules the restart and returns
189   * immediately. The stopping and subsequent starting of the receiver
190   * (by calling `onStop()` and `onStart()`) is performed asynchronously
191   * in a background thread. The delay between the stopping and the starting
192   * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
193   * The `message` will be reported to the driver.
194   */
195  def restart(message: String) {
196    supervisor.restartReceiver(message)
197  }
198
199  /**
200   * Restart the receiver. This method schedules the restart and returns
201   * immediately. The stopping and subsequent starting of the receiver
202   * (by calling `onStop()` and `onStart()`) is performed asynchronously
203   * in a background thread. The delay between the stopping and the starting
204   * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
205   * The `message` and `exception` will be reported to the driver.
206   */
207  def restart(message: String, error: Throwable) {
208    supervisor.restartReceiver(message, Some(error))
209  }
210
211  /**
212   * Restart the receiver. This method schedules the restart and returns
213   * immediately. The stopping and subsequent starting of the receiver
214   * (by calling `onStop()` and `onStart()`) is performed asynchronously
215   * in a background thread.
216   */
217  def restart(message: String, error: Throwable, millisecond: Int) {
218    supervisor.restartReceiver(message, Some(error), millisecond)
219  }
220
221  /** Stop the receiver completely. */
222  def stop(message: String) {
223    supervisor.stop(message, None)
224  }
225
226  /** Stop the receiver completely due to an exception */
227  def stop(message: String, error: Throwable) {
228    supervisor.stop(message, Some(error))
229  }
230
231  /** Check if the receiver has started or not. */
232  def isStarted(): Boolean = {
233    supervisor.isReceiverStarted()
234  }
235
236  /**
237   * Check if receiver has been marked for stopping. Use this to identify when
238   * the receiving of data should be stopped.
239   */
240  def isStopped(): Boolean = {
241    supervisor.isReceiverStopped()
242  }
243
244  /**
245   * Get the unique identifier the receiver input stream that this
246   * receiver is associated with.
247   */
248  def streamId: Int = id
249
250  /*
251   * =================
252   * Private methods
253   * =================
254   */
255
256  /** Identifier of the stream this receiver is associated with. */
257  private var id: Int = -1
258
259  /** Handler object that runs the receiver. This is instantiated lazily in the worker. */
260  @transient private var _supervisor: ReceiverSupervisor = null
261
262  /** Set the ID of the DStream that this receiver is associated with. */
263  private[streaming] def setReceiverId(_id: Int) {
264    id = _id
265  }
266
267  /** Attach Network Receiver executor to this receiver. */
268  private[streaming] def attachSupervisor(exec: ReceiverSupervisor) {
269    assert(_supervisor == null)
270    _supervisor = exec
271  }
272
273  /** Get the attached supervisor. */
274  private[streaming] def supervisor: ReceiverSupervisor = {
275    assert(_supervisor != null,
276      "A ReceiverSupervisor has not been attached to the receiver yet. Maybe you are starting " +
277        "some computation in the receiver before the Receiver.onStart() has been called.")
278    _supervisor
279  }
280}
281
282