1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.streaming
19
20import java.util.UUID
21import java.util.concurrent.{CountDownLatch, TimeUnit}
22import java.util.concurrent.atomic.AtomicReference
23import java.util.concurrent.locks.ReentrantLock
24
25import scala.collection.mutable.ArrayBuffer
26import scala.util.control.NonFatal
27
28import org.apache.hadoop.fs.Path
29
30import org.apache.spark.internal.Logging
31import org.apache.spark.sql._
32import org.apache.spark.sql.catalyst.encoders.RowEncoder
33import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
34import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
35import org.apache.spark.sql.execution.QueryExecution
36import org.apache.spark.sql.execution.command.StreamingExplainCommand
37import org.apache.spark.sql.streaming._
38import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
39
40/**
41 * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
42 * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
43 * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
44 * and the results are committed transactionally to the given [[Sink]].
45 *
46 * @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without
47 *                               errors
48 */
49class StreamExecution(
50    override val sparkSession: SparkSession,
51    override val name: String,
52    val checkpointRoot: String,
53    analyzedPlan: LogicalPlan,
54    val sink: Sink,
55    val trigger: Trigger,
56    val triggerClock: Clock,
57    val outputMode: OutputMode,
58    deleteCheckpointOnStop: Boolean)
59  extends StreamingQuery with ProgressReporter with Logging {
60
61  import org.apache.spark.sql.streaming.StreamingQueryListener._
62
63  private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
64
65  private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain
66  require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive")
67
68  /**
69   * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
70   */
71  private val awaitBatchLock = new ReentrantLock(true)
72  private val awaitBatchLockCondition = awaitBatchLock.newCondition()
73
74  private val initializationLatch = new CountDownLatch(1)
75  private val startLatch = new CountDownLatch(1)
76  private val terminationLatch = new CountDownLatch(1)
77
78  /**
79   * Tracks how much data we have processed and committed to the sink or state store from each
80   * input source.
81   * Only the scheduler thread should modify this field, and only in atomic steps.
82   * Other threads should make a shallow copy if they are going to access this field more than
83   * once, since the field's value may change at any time.
84   */
85  @volatile
86  var committedOffsets = new StreamProgress
87
88  /**
89   * Tracks the offsets that are available to be processed, but have not yet be committed to the
90   * sink.
91   * Only the scheduler thread should modify this field, and only in atomic steps.
92   * Other threads should make a shallow copy if they are going to access this field more than
93   * once, since the field's value may change at any time.
94   */
95  @volatile
96  var availableOffsets = new StreamProgress
97
98  /** The current batchId or -1 if execution has not yet been initialized. */
99  protected var currentBatchId: Long = -1
100
101  /** Metadata associated with the whole query */
102  protected val streamMetadata: StreamMetadata = {
103    val metadataPath = new Path(checkpointFile("metadata"))
104    val hadoopConf = sparkSession.sessionState.newHadoopConf()
105    StreamMetadata.read(metadataPath, hadoopConf).getOrElse {
106      val newMetadata = new StreamMetadata(UUID.randomUUID.toString)
107      StreamMetadata.write(newMetadata, metadataPath, hadoopConf)
108      newMetadata
109    }
110  }
111
112  /** Metadata associated with the offset seq of a batch in the query. */
113  protected var offsetSeqMetadata = OffsetSeqMetadata()
114
115  override val id: UUID = UUID.fromString(streamMetadata.id)
116
117  override val runId: UUID = UUID.randomUUID
118
119  /**
120   * Pretty identified string of printing in logs. Format is
121   * If name is set "queryName [id = xyz, runId = abc]" else "[id = xyz, runId = abc]"
122   */
123  private val prettyIdString =
124    Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
125
126  /**
127   * All stream sources present in the query plan. This will be set when generating logical plan.
128   */
129  @volatile protected var sources: Seq[Source] = Seq.empty
130
131  /**
132   * A list of unique sources in the query plan. This will be set when generating logical plan.
133   */
134  @volatile private var uniqueSources: Seq[Source] = Seq.empty
135
136  override lazy val logicalPlan: LogicalPlan = {
137    assert(microBatchThread eq Thread.currentThread,
138      "logicalPlan must be initialized in StreamExecutionThread " +
139        s"but the current thread was ${Thread.currentThread}")
140    var nextSourceId = 0L
141    val _logicalPlan = analyzedPlan.transform {
142      case StreamingRelation(dataSource, _, output) =>
143        // Materialize source to avoid creating it in every batch
144        val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
145        val source = dataSource.createSource(metadataPath)
146        nextSourceId += 1
147        // We still need to use the previous `output` instead of `source.schema` as attributes in
148        // "df.logicalPlan" has already used attributes of the previous `output`.
149        StreamingExecutionRelation(source, output)
150    }
151    sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
152    uniqueSources = sources.distinct
153    _logicalPlan
154  }
155
156  private val triggerExecutor = trigger match {
157    case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
158  }
159
160  /** Defines the internal state of execution */
161  private val state = new AtomicReference[State](INITIALIZING)
162
163  @volatile
164  var lastExecution: IncrementalExecution = _
165
166  /** Holds the most recent input data for each source. */
167  protected var newData: Map[Source, DataFrame] = _
168
169  @volatile
170  private var streamDeathCause: StreamingQueryException = null
171
172  /* Get the call site in the caller thread; will pass this into the micro batch thread */
173  private val callSite = Utils.getCallSite()
174
175  /** Used to report metrics to coda-hale. This uses id for easier tracking across restarts. */
176  lazy val streamMetrics = new MetricsReporter(
177    this, s"spark.streaming.${Option(name).getOrElse(id)}")
178
179  /**
180   * The thread that runs the micro-batches of this stream. Note that this thread must be
181   * [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a
182   * running `KafkaConsumer` may cause endless loop, and HADOOP-10622: interrupting
183   * `Shell.runCommand` causes deadlock. (SPARK-14131)
184   */
185  val microBatchThread =
186    new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
187      override def run(): Unit = {
188        // To fix call site like "run at <unknown>:0", we bridge the call site from the caller
189        // thread to this micro batch thread
190        sparkSession.sparkContext.setCallSite(callSite)
191        runBatches()
192      }
193    }
194
195  /**
196   * A write-ahead-log that records the offsets that are present in each batch. In order to ensure
197   * that a given batch will always consist of the same data, we write to this log *before* any
198   * processing is done.  Thus, the Nth record in this log indicated data that is currently being
199   * processed and the N-1th entry indicates which offsets have been durably committed to the sink.
200   */
201  val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
202
203  /** Whether all fields of the query have been initialized */
204  private def isInitialized: Boolean = state.get != INITIALIZING
205
206  /** Whether the query is currently active or not */
207  override def isActive: Boolean = state.get != TERMINATED
208
209  /** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */
210  override def exception: Option[StreamingQueryException] = Option(streamDeathCause)
211
212  /** Returns the path of a file with `name` in the checkpoint directory. */
213  private def checkpointFile(name: String): String =
214    new Path(new Path(checkpointRoot), name).toUri.toString
215
216  /**
217   * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]]
218   * has been posted to all the listeners.
219   */
220  def start(): Unit = {
221    logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query checkpoint.")
222    microBatchThread.setDaemon(true)
223    microBatchThread.start()
224    startLatch.await()  // Wait until thread started and QueryStart event has been posted
225  }
226
227  /**
228   * Repeatedly attempts to run batches as data arrives.
229   *
230   * Note that this method ensures that [[QueryStartedEvent]] and [[QueryTerminatedEvent]] are
231   * posted such that listeners are guaranteed to get a start event before a termination.
232   * Furthermore, this method also ensures that [[QueryStartedEvent]] event is posted before the
233   * `start()` method returns.
234   */
235  private def runBatches(): Unit = {
236    try {
237      if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
238        sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
239      }
240
241      // `postEvent` does not throw non fatal exception.
242      postEvent(new QueryStartedEvent(id, runId, name))
243
244      // Unblock starting thread
245      startLatch.countDown()
246
247      // While active, repeatedly attempt to run batches.
248      SparkSession.setActiveSession(sparkSession)
249
250      updateStatusMessage("Initializing sources")
251      // force initialization of the logical plan so that the sources can be created
252      logicalPlan
253      if (state.compareAndSet(INITIALIZING, ACTIVE)) {
254        // Unblock `awaitInitialization`
255        initializationLatch.countDown()
256
257        triggerExecutor.execute(() => {
258          startTrigger()
259
260          val continueToRun =
261            if (isActive) {
262              reportTimeTaken("triggerExecution") {
263                if (currentBatchId < 0) {
264                  // We'll do this initialization only once
265                  populateStartOffsets()
266                  logDebug(s"Stream running from $committedOffsets to $availableOffsets")
267                } else {
268                  constructNextBatch()
269                }
270                if (dataAvailable) {
271                  currentStatus = currentStatus.copy(isDataAvailable = true)
272                  updateStatusMessage("Processing new data")
273                  runBatch()
274                }
275              }
276
277              // Report trigger as finished and construct progress object.
278              finishTrigger(dataAvailable)
279              if (dataAvailable) {
280                // We'll increase currentBatchId after we complete processing current batch's data
281                currentBatchId += 1
282              } else {
283                currentStatus = currentStatus.copy(isDataAvailable = false)
284                updateStatusMessage("Waiting for data to arrive")
285                Thread.sleep(pollingDelayMs)
286              }
287              true
288            } else {
289              false
290            }
291
292          // Update committed offsets.
293          committedOffsets ++= availableOffsets
294          updateStatusMessage("Waiting for next trigger")
295          continueToRun
296        })
297        updateStatusMessage("Stopped")
298      } else {
299        // `stop()` is already called. Let `finally` finish the cleanup.
300      }
301    } catch {
302      case _: InterruptedException if state.get == TERMINATED => // interrupted by stop()
303        updateStatusMessage("Stopped")
304      case e: Throwable =>
305        streamDeathCause = new StreamingQueryException(
306          toDebugString(includeLogicalPlan = isInitialized),
307          s"Query $prettyIdString terminated with exception: ${e.getMessage}",
308          e,
309          committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString,
310          availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString)
311        logError(s"Query $prettyIdString terminated with error", e)
312        updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
313        // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
314        // handle them
315        if (!NonFatal(e)) {
316          throw e
317        }
318    } finally {
319      // Release latches to unblock the user codes since exception can happen in any place and we
320      // may not get a chance to release them
321      startLatch.countDown()
322      initializationLatch.countDown()
323
324      try {
325        stopSources()
326        state.set(TERMINATED)
327        currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false)
328
329        // Update metrics and status
330        sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)
331
332        // Notify others
333        sparkSession.streams.notifyQueryTermination(StreamExecution.this)
334        postEvent(
335          new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
336
337        // Delete the temp checkpoint only when the query didn't fail
338        if (deleteCheckpointOnStop && exception.isEmpty) {
339          val checkpointPath = new Path(checkpointRoot)
340          try {
341            val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
342            fs.delete(checkpointPath, true)
343          } catch {
344            case NonFatal(e) =>
345              // Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions
346              // when we cannot delete them.
347              logWarning(s"Cannot delete $checkpointPath", e)
348          }
349        }
350      } finally {
351        awaitBatchLock.lock()
352        try {
353          // Wake up any threads that are waiting for the stream to progress.
354          awaitBatchLockCondition.signalAll()
355        } finally {
356          awaitBatchLock.unlock()
357        }
358        terminationLatch.countDown()
359      }
360    }
361  }
362
363  /**
364   * Populate the start offsets to start the execution at the current offsets stored in the sink
365   * (i.e. avoid reprocessing data that we have already processed). This function must be called
366   * before any processing occurs and will populate the following fields:
367   *  - currentBatchId
368   *  - committedOffsets
369   *  - availableOffsets
370   */
371  private def populateStartOffsets(): Unit = {
372    offsetLog.getLatest() match {
373      case Some((batchId, nextOffsets)) =>
374        logInfo(s"Resuming streaming query, starting with batch $batchId")
375        currentBatchId = batchId
376        availableOffsets = nextOffsets.toStreamProgress(sources)
377        offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
378        logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
379          s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")
380
381        offsetLog.get(batchId - 1).foreach {
382          case lastOffsets =>
383            committedOffsets = lastOffsets.toStreamProgress(sources)
384            logDebug(s"Resuming with committed offsets: $committedOffsets")
385        }
386      case None => // We are starting this stream for the first time.
387        logInfo(s"Starting new streaming query.")
388        currentBatchId = 0
389        constructNextBatch()
390    }
391  }
392
393  /**
394   * Returns true if there is any new data available to be processed.
395   */
396  private def dataAvailable: Boolean = {
397    availableOffsets.exists {
398      case (source, available) =>
399        committedOffsets
400            .get(source)
401            .map(committed => committed != available)
402            .getOrElse(true)
403    }
404  }
405
406  /**
407   * Queries all of the sources to see if any new data is available. When there is new data the
408   * batchId counter is incremented and a new log entry is written with the newest offsets.
409   */
410  private def constructNextBatch(): Unit = {
411    // Check to see what new data is available.
412    val hasNewData = {
413      awaitBatchLock.lock()
414      try {
415        val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s =>
416          updateStatusMessage(s"Getting offsets from $s")
417          reportTimeTaken("getOffset") {
418            (s, s.getOffset)
419          }
420        }.toMap
421        availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get)
422
423        if (dataAvailable) {
424          true
425        } else {
426          noNewData = true
427          false
428        }
429      } finally {
430        awaitBatchLock.unlock()
431      }
432    }
433    if (hasNewData) {
434      // Current batch timestamp in milliseconds
435      offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
436      // Update the eventTime watermark if we find one in the plan.
437      if (lastExecution != null) {
438        lastExecution.executedPlan.collect {
439          case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
440            logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
441            e.eventTimeStats.value.max - e.delayMs
442        }.headOption.foreach { newWatermarkMs =>
443          if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
444            logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
445            offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
446          } else {
447            logDebug(
448              s"Event time didn't move: $newWatermarkMs < " +
449                s"${offsetSeqMetadata.batchWatermarkMs}")
450          }
451        }
452      }
453
454      updateStatusMessage("Writing offsets to log")
455      reportTimeTaken("walCommit") {
456        assert(offsetLog.add(
457          currentBatchId,
458          availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
459          s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
460        logInfo(s"Committed offsets for batch $currentBatchId. " +
461          s"Metadata ${offsetSeqMetadata.toString}")
462
463        // NOTE: The following code is correct because runBatches() processes exactly one
464        // batch at a time. If we add pipeline parallelism (multiple batches in flight at
465        // the same time), this cleanup logic will need to change.
466
467        // Now that we've updated the scheduler's persistent checkpoint, it is safe for the
468        // sources to discard data from the previous batch.
469        val prevBatchOff = offsetLog.get(currentBatchId - 1)
470        if (prevBatchOff.isDefined) {
471          prevBatchOff.get.toStreamProgress(sources).foreach {
472            case (src, off) => src.commit(off)
473          }
474        }
475
476        // It is now safe to discard the metadata beyond the minimum number to retain.
477        // Note that purge is exclusive, i.e. it purges everything before the target ID.
478        if (minBatchesToRetain < currentBatchId) {
479          offsetLog.purge(currentBatchId - minBatchesToRetain)
480        }
481      }
482    } else {
483      awaitBatchLock.lock()
484      try {
485        // Wake up any threads that are waiting for the stream to progress.
486        awaitBatchLockCondition.signalAll()
487      } finally {
488        awaitBatchLock.unlock()
489      }
490    }
491  }
492
493  /**
494   * Processes any data available between `availableOffsets` and `committedOffsets`.
495   */
496  private def runBatch(): Unit = {
497    // Request unprocessed data from all sources.
498    newData = reportTimeTaken("getBatch") {
499      availableOffsets.flatMap {
500        case (source, available)
501          if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
502          val current = committedOffsets.get(source)
503          val batch = source.getBatch(current, available)
504          logDebug(s"Retrieving data from $source: $current -> $available")
505          Some(source -> batch)
506        case _ => None
507      }
508    }
509
510    // A list of attributes that will need to be updated.
511    var replacements = new ArrayBuffer[(Attribute, Attribute)]
512    // Replace sources in the logical plan with data that has arrived since the last batch.
513    val withNewSources = logicalPlan transform {
514      case StreamingExecutionRelation(source, output) =>
515        newData.get(source).map { data =>
516          val newPlan = data.logicalPlan
517          assert(output.size == newPlan.output.size,
518            s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
519            s"${Utils.truncatedString(newPlan.output, ",")}")
520          replacements ++= output.zip(newPlan.output)
521          newPlan
522        }.getOrElse {
523          LocalRelation(output)
524        }
525    }
526
527    // Rewire the plan to use the new attributes that were returned by the source.
528    val replacementMap = AttributeMap(replacements)
529    val triggerLogicalPlan = withNewSources transformAllExpressions {
530      case a: Attribute if replacementMap.contains(a) => replacementMap(a)
531      case ct: CurrentTimestamp =>
532        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
533          ct.dataType)
534      case cd: CurrentDate =>
535        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
536          cd.dataType)
537    }
538
539    reportTimeTaken("queryPlanning") {
540      lastExecution = new IncrementalExecution(
541        sparkSession,
542        triggerLogicalPlan,
543        outputMode,
544        checkpointFile("state"),
545        currentBatchId,
546        offsetSeqMetadata.batchWatermarkMs)
547      lastExecution.executedPlan // Force the lazy generation of execution plan
548    }
549
550    val nextBatch =
551      new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema))
552
553    reportTimeTaken("addBatch") {
554      sink.addBatch(currentBatchId, nextBatch)
555    }
556
557    awaitBatchLock.lock()
558    try {
559      // Wake up any threads that are waiting for the stream to progress.
560      awaitBatchLockCondition.signalAll()
561    } finally {
562      awaitBatchLock.unlock()
563    }
564  }
565
566  override protected def postEvent(event: StreamingQueryListener.Event): Unit = {
567    sparkSession.streams.postListenerEvent(event)
568  }
569
570  /** Stops all streaming sources safely. */
571  private def stopSources(): Unit = {
572    uniqueSources.foreach { source =>
573      try {
574        source.stop()
575      } catch {
576        case NonFatal(e) =>
577          logWarning(s"Failed to stop streaming source: $source. Resources may have leaked.", e)
578      }
579    }
580  }
581
582  /**
583   * Signals to the thread executing micro-batches that it should stop running after the next
584   * batch. This method blocks until the thread stops running.
585   */
586  override def stop(): Unit = {
587    // Set the state to TERMINATED so that the batching thread knows that it was interrupted
588    // intentionally
589    state.set(TERMINATED)
590    if (microBatchThread.isAlive) {
591      microBatchThread.interrupt()
592      microBatchThread.join()
593    }
594    logInfo(s"Query $prettyIdString was stopped")
595  }
596
597  /**
598   * Blocks the current thread until processing for data from the given `source` has reached at
599   * least the given `Offset`. This method is intended for use primarily when writing tests.
600   */
601  private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
602    assertAwaitThread()
603    def notDone = {
604      val localCommittedOffsets = committedOffsets
605      !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset
606    }
607
608    while (notDone) {
609      awaitBatchLock.lock()
610      try {
611        awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
612        if (streamDeathCause != null) {
613          throw streamDeathCause
614        }
615      } finally {
616        awaitBatchLock.unlock()
617      }
618    }
619    logDebug(s"Unblocked at $newOffset for $source")
620  }
621
622  /** A flag to indicate that a batch has completed with no new data available. */
623  @volatile private var noNewData = false
624
625  /**
626   * Assert that the await APIs should not be called in the stream thread. Otherwise, it may cause
627   * dead-lock, e.g., calling any await APIs in `StreamingQueryListener.onQueryStarted` will block
628   * the stream thread forever.
629   */
630  private def assertAwaitThread(): Unit = {
631    if (microBatchThread eq Thread.currentThread) {
632      throw new IllegalStateException(
633        "Cannot wait for a query state from the same thread that is running the query")
634    }
635  }
636
637  /**
638   * Await until all fields of the query have been initialized.
639   */
640  def awaitInitialization(timeoutMs: Long): Unit = {
641    assertAwaitThread()
642    require(timeoutMs > 0, "Timeout has to be positive")
643    if (streamDeathCause != null) {
644      throw streamDeathCause
645    }
646    initializationLatch.await(timeoutMs, TimeUnit.MILLISECONDS)
647    if (streamDeathCause != null) {
648      throw streamDeathCause
649    }
650  }
651
652  override def processAllAvailable(): Unit = {
653    assertAwaitThread()
654    if (streamDeathCause != null) {
655      throw streamDeathCause
656    }
657    awaitBatchLock.lock()
658    try {
659      noNewData = false
660      while (true) {
661        awaitBatchLockCondition.await(10000, TimeUnit.MILLISECONDS)
662        if (streamDeathCause != null) {
663          throw streamDeathCause
664        }
665        if (noNewData) {
666          return
667        }
668      }
669    } finally {
670      awaitBatchLock.unlock()
671    }
672  }
673
674  override def awaitTermination(): Unit = {
675    assertAwaitThread()
676    terminationLatch.await()
677    if (streamDeathCause != null) {
678      throw streamDeathCause
679    }
680  }
681
682  override def awaitTermination(timeoutMs: Long): Boolean = {
683    assertAwaitThread()
684    require(timeoutMs > 0, "Timeout has to be positive")
685    terminationLatch.await(timeoutMs, TimeUnit.MILLISECONDS)
686    if (streamDeathCause != null) {
687      throw streamDeathCause
688    } else {
689      !isActive
690    }
691  }
692
693  /** Expose for tests */
694  def explainInternal(extended: Boolean): String = {
695    if (lastExecution == null) {
696      "No physical plan. Waiting for data."
697    } else {
698      val explain = StreamingExplainCommand(lastExecution, extended = extended)
699      sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect()
700        .map(_.getString(0)).mkString("\n")
701    }
702  }
703
704  override def explain(extended: Boolean): Unit = {
705    // scalastyle:off println
706    println(explainInternal(extended))
707    // scalastyle:on println
708  }
709
710  override def explain(): Unit = explain(extended = false)
711
712  override def toString: String = {
713    s"Streaming Query $prettyIdString [state = $state]"
714  }
715
716  private def toDebugString(includeLogicalPlan: Boolean): String = {
717    val debugString =
718      s"""|=== Streaming Query ===
719          |Identifier: $prettyIdString
720          |Current Committed Offsets: $committedOffsets
721          |Current Available Offsets: $availableOffsets
722          |
723          |Current State: $state
724          |Thread State: ${microBatchThread.getState}""".stripMargin
725    if (includeLogicalPlan) {
726      debugString + s"\n\nLogical Plan:\n$logicalPlan"
727    } else {
728      debugString
729    }
730  }
731
732  trait State
733  case object INITIALIZING extends State
734  case object ACTIVE extends State
735  case object TERMINATED extends State
736}
737
738
739/**
740 * A special thread to run the stream query. Some codes require to run in the StreamExecutionThread
741 * and will use `classOf[StreamExecutionThread]` to check.
742 */
743abstract class StreamExecutionThread(name: String) extends UninterruptibleThread(name)
744