1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.streaming 19 20import java.util.UUID 21import java.util.concurrent.{CountDownLatch, TimeUnit} 22import java.util.concurrent.atomic.AtomicReference 23import java.util.concurrent.locks.ReentrantLock 24 25import scala.collection.mutable.ArrayBuffer 26import scala.util.control.NonFatal 27 28import org.apache.hadoop.fs.Path 29 30import org.apache.spark.internal.Logging 31import org.apache.spark.sql._ 32import org.apache.spark.sql.catalyst.encoders.RowEncoder 33import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} 34import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} 35import org.apache.spark.sql.execution.QueryExecution 36import org.apache.spark.sql.execution.command.StreamingExplainCommand 37import org.apache.spark.sql.streaming._ 38import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} 39 40/** 41 * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread. 42 * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any 43 * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created 44 * and the results are committed transactionally to the given [[Sink]]. 45 * 46 * @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without 47 * errors 48 */ 49class StreamExecution( 50 override val sparkSession: SparkSession, 51 override val name: String, 52 val checkpointRoot: String, 53 analyzedPlan: LogicalPlan, 54 val sink: Sink, 55 val trigger: Trigger, 56 val triggerClock: Clock, 57 val outputMode: OutputMode, 58 deleteCheckpointOnStop: Boolean) 59 extends StreamingQuery with ProgressReporter with Logging { 60 61 import org.apache.spark.sql.streaming.StreamingQueryListener._ 62 63 private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay 64 65 private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain 66 require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive") 67 68 /** 69 * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. 70 */ 71 private val awaitBatchLock = new ReentrantLock(true) 72 private val awaitBatchLockCondition = awaitBatchLock.newCondition() 73 74 private val initializationLatch = new CountDownLatch(1) 75 private val startLatch = new CountDownLatch(1) 76 private val terminationLatch = new CountDownLatch(1) 77 78 /** 79 * Tracks how much data we have processed and committed to the sink or state store from each 80 * input source. 81 * Only the scheduler thread should modify this field, and only in atomic steps. 82 * Other threads should make a shallow copy if they are going to access this field more than 83 * once, since the field's value may change at any time. 84 */ 85 @volatile 86 var committedOffsets = new StreamProgress 87 88 /** 89 * Tracks the offsets that are available to be processed, but have not yet be committed to the 90 * sink. 91 * Only the scheduler thread should modify this field, and only in atomic steps. 92 * Other threads should make a shallow copy if they are going to access this field more than 93 * once, since the field's value may change at any time. 94 */ 95 @volatile 96 var availableOffsets = new StreamProgress 97 98 /** The current batchId or -1 if execution has not yet been initialized. */ 99 protected var currentBatchId: Long = -1 100 101 /** Metadata associated with the whole query */ 102 protected val streamMetadata: StreamMetadata = { 103 val metadataPath = new Path(checkpointFile("metadata")) 104 val hadoopConf = sparkSession.sessionState.newHadoopConf() 105 StreamMetadata.read(metadataPath, hadoopConf).getOrElse { 106 val newMetadata = new StreamMetadata(UUID.randomUUID.toString) 107 StreamMetadata.write(newMetadata, metadataPath, hadoopConf) 108 newMetadata 109 } 110 } 111 112 /** Metadata associated with the offset seq of a batch in the query. */ 113 protected var offsetSeqMetadata = OffsetSeqMetadata() 114 115 override val id: UUID = UUID.fromString(streamMetadata.id) 116 117 override val runId: UUID = UUID.randomUUID 118 119 /** 120 * Pretty identified string of printing in logs. Format is 121 * If name is set "queryName [id = xyz, runId = abc]" else "[id = xyz, runId = abc]" 122 */ 123 private val prettyIdString = 124 Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]" 125 126 /** 127 * All stream sources present in the query plan. This will be set when generating logical plan. 128 */ 129 @volatile protected var sources: Seq[Source] = Seq.empty 130 131 /** 132 * A list of unique sources in the query plan. This will be set when generating logical plan. 133 */ 134 @volatile private var uniqueSources: Seq[Source] = Seq.empty 135 136 override lazy val logicalPlan: LogicalPlan = { 137 assert(microBatchThread eq Thread.currentThread, 138 "logicalPlan must be initialized in StreamExecutionThread " + 139 s"but the current thread was ${Thread.currentThread}") 140 var nextSourceId = 0L 141 val _logicalPlan = analyzedPlan.transform { 142 case StreamingRelation(dataSource, _, output) => 143 // Materialize source to avoid creating it in every batch 144 val metadataPath = s"$checkpointRoot/sources/$nextSourceId" 145 val source = dataSource.createSource(metadataPath) 146 nextSourceId += 1 147 // We still need to use the previous `output` instead of `source.schema` as attributes in 148 // "df.logicalPlan" has already used attributes of the previous `output`. 149 StreamingExecutionRelation(source, output) 150 } 151 sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source } 152 uniqueSources = sources.distinct 153 _logicalPlan 154 } 155 156 private val triggerExecutor = trigger match { 157 case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) 158 } 159 160 /** Defines the internal state of execution */ 161 private val state = new AtomicReference[State](INITIALIZING) 162 163 @volatile 164 var lastExecution: IncrementalExecution = _ 165 166 /** Holds the most recent input data for each source. */ 167 protected var newData: Map[Source, DataFrame] = _ 168 169 @volatile 170 private var streamDeathCause: StreamingQueryException = null 171 172 /* Get the call site in the caller thread; will pass this into the micro batch thread */ 173 private val callSite = Utils.getCallSite() 174 175 /** Used to report metrics to coda-hale. This uses id for easier tracking across restarts. */ 176 lazy val streamMetrics = new MetricsReporter( 177 this, s"spark.streaming.${Option(name).getOrElse(id)}") 178 179 /** 180 * The thread that runs the micro-batches of this stream. Note that this thread must be 181 * [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a 182 * running `KafkaConsumer` may cause endless loop, and HADOOP-10622: interrupting 183 * `Shell.runCommand` causes deadlock. (SPARK-14131) 184 */ 185 val microBatchThread = 186 new StreamExecutionThread(s"stream execution thread for $prettyIdString") { 187 override def run(): Unit = { 188 // To fix call site like "run at <unknown>:0", we bridge the call site from the caller 189 // thread to this micro batch thread 190 sparkSession.sparkContext.setCallSite(callSite) 191 runBatches() 192 } 193 } 194 195 /** 196 * A write-ahead-log that records the offsets that are present in each batch. In order to ensure 197 * that a given batch will always consist of the same data, we write to this log *before* any 198 * processing is done. Thus, the Nth record in this log indicated data that is currently being 199 * processed and the N-1th entry indicates which offsets have been durably committed to the sink. 200 */ 201 val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets")) 202 203 /** Whether all fields of the query have been initialized */ 204 private def isInitialized: Boolean = state.get != INITIALIZING 205 206 /** Whether the query is currently active or not */ 207 override def isActive: Boolean = state.get != TERMINATED 208 209 /** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */ 210 override def exception: Option[StreamingQueryException] = Option(streamDeathCause) 211 212 /** Returns the path of a file with `name` in the checkpoint directory. */ 213 private def checkpointFile(name: String): String = 214 new Path(new Path(checkpointRoot), name).toUri.toString 215 216 /** 217 * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]] 218 * has been posted to all the listeners. 219 */ 220 def start(): Unit = { 221 logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query checkpoint.") 222 microBatchThread.setDaemon(true) 223 microBatchThread.start() 224 startLatch.await() // Wait until thread started and QueryStart event has been posted 225 } 226 227 /** 228 * Repeatedly attempts to run batches as data arrives. 229 * 230 * Note that this method ensures that [[QueryStartedEvent]] and [[QueryTerminatedEvent]] are 231 * posted such that listeners are guaranteed to get a start event before a termination. 232 * Furthermore, this method also ensures that [[QueryStartedEvent]] event is posted before the 233 * `start()` method returns. 234 */ 235 private def runBatches(): Unit = { 236 try { 237 if (sparkSession.sessionState.conf.streamingMetricsEnabled) { 238 sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics) 239 } 240 241 // `postEvent` does not throw non fatal exception. 242 postEvent(new QueryStartedEvent(id, runId, name)) 243 244 // Unblock starting thread 245 startLatch.countDown() 246 247 // While active, repeatedly attempt to run batches. 248 SparkSession.setActiveSession(sparkSession) 249 250 updateStatusMessage("Initializing sources") 251 // force initialization of the logical plan so that the sources can be created 252 logicalPlan 253 if (state.compareAndSet(INITIALIZING, ACTIVE)) { 254 // Unblock `awaitInitialization` 255 initializationLatch.countDown() 256 257 triggerExecutor.execute(() => { 258 startTrigger() 259 260 val continueToRun = 261 if (isActive) { 262 reportTimeTaken("triggerExecution") { 263 if (currentBatchId < 0) { 264 // We'll do this initialization only once 265 populateStartOffsets() 266 logDebug(s"Stream running from $committedOffsets to $availableOffsets") 267 } else { 268 constructNextBatch() 269 } 270 if (dataAvailable) { 271 currentStatus = currentStatus.copy(isDataAvailable = true) 272 updateStatusMessage("Processing new data") 273 runBatch() 274 } 275 } 276 277 // Report trigger as finished and construct progress object. 278 finishTrigger(dataAvailable) 279 if (dataAvailable) { 280 // We'll increase currentBatchId after we complete processing current batch's data 281 currentBatchId += 1 282 } else { 283 currentStatus = currentStatus.copy(isDataAvailable = false) 284 updateStatusMessage("Waiting for data to arrive") 285 Thread.sleep(pollingDelayMs) 286 } 287 true 288 } else { 289 false 290 } 291 292 // Update committed offsets. 293 committedOffsets ++= availableOffsets 294 updateStatusMessage("Waiting for next trigger") 295 continueToRun 296 }) 297 updateStatusMessage("Stopped") 298 } else { 299 // `stop()` is already called. Let `finally` finish the cleanup. 300 } 301 } catch { 302 case _: InterruptedException if state.get == TERMINATED => // interrupted by stop() 303 updateStatusMessage("Stopped") 304 case e: Throwable => 305 streamDeathCause = new StreamingQueryException( 306 toDebugString(includeLogicalPlan = isInitialized), 307 s"Query $prettyIdString terminated with exception: ${e.getMessage}", 308 e, 309 committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString, 310 availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString) 311 logError(s"Query $prettyIdString terminated with error", e) 312 updateStatusMessage(s"Terminated with exception: ${e.getMessage}") 313 // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to 314 // handle them 315 if (!NonFatal(e)) { 316 throw e 317 } 318 } finally { 319 // Release latches to unblock the user codes since exception can happen in any place and we 320 // may not get a chance to release them 321 startLatch.countDown() 322 initializationLatch.countDown() 323 324 try { 325 stopSources() 326 state.set(TERMINATED) 327 currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false) 328 329 // Update metrics and status 330 sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics) 331 332 // Notify others 333 sparkSession.streams.notifyQueryTermination(StreamExecution.this) 334 postEvent( 335 new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString))) 336 337 // Delete the temp checkpoint only when the query didn't fail 338 if (deleteCheckpointOnStop && exception.isEmpty) { 339 val checkpointPath = new Path(checkpointRoot) 340 try { 341 val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) 342 fs.delete(checkpointPath, true) 343 } catch { 344 case NonFatal(e) => 345 // Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions 346 // when we cannot delete them. 347 logWarning(s"Cannot delete $checkpointPath", e) 348 } 349 } 350 } finally { 351 awaitBatchLock.lock() 352 try { 353 // Wake up any threads that are waiting for the stream to progress. 354 awaitBatchLockCondition.signalAll() 355 } finally { 356 awaitBatchLock.unlock() 357 } 358 terminationLatch.countDown() 359 } 360 } 361 } 362 363 /** 364 * Populate the start offsets to start the execution at the current offsets stored in the sink 365 * (i.e. avoid reprocessing data that we have already processed). This function must be called 366 * before any processing occurs and will populate the following fields: 367 * - currentBatchId 368 * - committedOffsets 369 * - availableOffsets 370 */ 371 private def populateStartOffsets(): Unit = { 372 offsetLog.getLatest() match { 373 case Some((batchId, nextOffsets)) => 374 logInfo(s"Resuming streaming query, starting with batch $batchId") 375 currentBatchId = batchId 376 availableOffsets = nextOffsets.toStreamProgress(sources) 377 offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata()) 378 logDebug(s"Found possibly unprocessed offsets $availableOffsets " + 379 s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}") 380 381 offsetLog.get(batchId - 1).foreach { 382 case lastOffsets => 383 committedOffsets = lastOffsets.toStreamProgress(sources) 384 logDebug(s"Resuming with committed offsets: $committedOffsets") 385 } 386 case None => // We are starting this stream for the first time. 387 logInfo(s"Starting new streaming query.") 388 currentBatchId = 0 389 constructNextBatch() 390 } 391 } 392 393 /** 394 * Returns true if there is any new data available to be processed. 395 */ 396 private def dataAvailable: Boolean = { 397 availableOffsets.exists { 398 case (source, available) => 399 committedOffsets 400 .get(source) 401 .map(committed => committed != available) 402 .getOrElse(true) 403 } 404 } 405 406 /** 407 * Queries all of the sources to see if any new data is available. When there is new data the 408 * batchId counter is incremented and a new log entry is written with the newest offsets. 409 */ 410 private def constructNextBatch(): Unit = { 411 // Check to see what new data is available. 412 val hasNewData = { 413 awaitBatchLock.lock() 414 try { 415 val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s => 416 updateStatusMessage(s"Getting offsets from $s") 417 reportTimeTaken("getOffset") { 418 (s, s.getOffset) 419 } 420 }.toMap 421 availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get) 422 423 if (dataAvailable) { 424 true 425 } else { 426 noNewData = true 427 false 428 } 429 } finally { 430 awaitBatchLock.unlock() 431 } 432 } 433 if (hasNewData) { 434 // Current batch timestamp in milliseconds 435 offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis() 436 // Update the eventTime watermark if we find one in the plan. 437 if (lastExecution != null) { 438 lastExecution.executedPlan.collect { 439 case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => 440 logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") 441 e.eventTimeStats.value.max - e.delayMs 442 }.headOption.foreach { newWatermarkMs => 443 if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) { 444 logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") 445 offsetSeqMetadata.batchWatermarkMs = newWatermarkMs 446 } else { 447 logDebug( 448 s"Event time didn't move: $newWatermarkMs < " + 449 s"${offsetSeqMetadata.batchWatermarkMs}") 450 } 451 } 452 } 453 454 updateStatusMessage("Writing offsets to log") 455 reportTimeTaken("walCommit") { 456 assert(offsetLog.add( 457 currentBatchId, 458 availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)), 459 s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") 460 logInfo(s"Committed offsets for batch $currentBatchId. " + 461 s"Metadata ${offsetSeqMetadata.toString}") 462 463 // NOTE: The following code is correct because runBatches() processes exactly one 464 // batch at a time. If we add pipeline parallelism (multiple batches in flight at 465 // the same time), this cleanup logic will need to change. 466 467 // Now that we've updated the scheduler's persistent checkpoint, it is safe for the 468 // sources to discard data from the previous batch. 469 val prevBatchOff = offsetLog.get(currentBatchId - 1) 470 if (prevBatchOff.isDefined) { 471 prevBatchOff.get.toStreamProgress(sources).foreach { 472 case (src, off) => src.commit(off) 473 } 474 } 475 476 // It is now safe to discard the metadata beyond the minimum number to retain. 477 // Note that purge is exclusive, i.e. it purges everything before the target ID. 478 if (minBatchesToRetain < currentBatchId) { 479 offsetLog.purge(currentBatchId - minBatchesToRetain) 480 } 481 } 482 } else { 483 awaitBatchLock.lock() 484 try { 485 // Wake up any threads that are waiting for the stream to progress. 486 awaitBatchLockCondition.signalAll() 487 } finally { 488 awaitBatchLock.unlock() 489 } 490 } 491 } 492 493 /** 494 * Processes any data available between `availableOffsets` and `committedOffsets`. 495 */ 496 private def runBatch(): Unit = { 497 // Request unprocessed data from all sources. 498 newData = reportTimeTaken("getBatch") { 499 availableOffsets.flatMap { 500 case (source, available) 501 if committedOffsets.get(source).map(_ != available).getOrElse(true) => 502 val current = committedOffsets.get(source) 503 val batch = source.getBatch(current, available) 504 logDebug(s"Retrieving data from $source: $current -> $available") 505 Some(source -> batch) 506 case _ => None 507 } 508 } 509 510 // A list of attributes that will need to be updated. 511 var replacements = new ArrayBuffer[(Attribute, Attribute)] 512 // Replace sources in the logical plan with data that has arrived since the last batch. 513 val withNewSources = logicalPlan transform { 514 case StreamingExecutionRelation(source, output) => 515 newData.get(source).map { data => 516 val newPlan = data.logicalPlan 517 assert(output.size == newPlan.output.size, 518 s"Invalid batch: ${Utils.truncatedString(output, ",")} != " + 519 s"${Utils.truncatedString(newPlan.output, ",")}") 520 replacements ++= output.zip(newPlan.output) 521 newPlan 522 }.getOrElse { 523 LocalRelation(output) 524 } 525 } 526 527 // Rewire the plan to use the new attributes that were returned by the source. 528 val replacementMap = AttributeMap(replacements) 529 val triggerLogicalPlan = withNewSources transformAllExpressions { 530 case a: Attribute if replacementMap.contains(a) => replacementMap(a) 531 case ct: CurrentTimestamp => 532 CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, 533 ct.dataType) 534 case cd: CurrentDate => 535 CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, 536 cd.dataType) 537 } 538 539 reportTimeTaken("queryPlanning") { 540 lastExecution = new IncrementalExecution( 541 sparkSession, 542 triggerLogicalPlan, 543 outputMode, 544 checkpointFile("state"), 545 currentBatchId, 546 offsetSeqMetadata.batchWatermarkMs) 547 lastExecution.executedPlan // Force the lazy generation of execution plan 548 } 549 550 val nextBatch = 551 new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema)) 552 553 reportTimeTaken("addBatch") { 554 sink.addBatch(currentBatchId, nextBatch) 555 } 556 557 awaitBatchLock.lock() 558 try { 559 // Wake up any threads that are waiting for the stream to progress. 560 awaitBatchLockCondition.signalAll() 561 } finally { 562 awaitBatchLock.unlock() 563 } 564 } 565 566 override protected def postEvent(event: StreamingQueryListener.Event): Unit = { 567 sparkSession.streams.postListenerEvent(event) 568 } 569 570 /** Stops all streaming sources safely. */ 571 private def stopSources(): Unit = { 572 uniqueSources.foreach { source => 573 try { 574 source.stop() 575 } catch { 576 case NonFatal(e) => 577 logWarning(s"Failed to stop streaming source: $source. Resources may have leaked.", e) 578 } 579 } 580 } 581 582 /** 583 * Signals to the thread executing micro-batches that it should stop running after the next 584 * batch. This method blocks until the thread stops running. 585 */ 586 override def stop(): Unit = { 587 // Set the state to TERMINATED so that the batching thread knows that it was interrupted 588 // intentionally 589 state.set(TERMINATED) 590 if (microBatchThread.isAlive) { 591 microBatchThread.interrupt() 592 microBatchThread.join() 593 } 594 logInfo(s"Query $prettyIdString was stopped") 595 } 596 597 /** 598 * Blocks the current thread until processing for data from the given `source` has reached at 599 * least the given `Offset`. This method is intended for use primarily when writing tests. 600 */ 601 private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = { 602 assertAwaitThread() 603 def notDone = { 604 val localCommittedOffsets = committedOffsets 605 !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset 606 } 607 608 while (notDone) { 609 awaitBatchLock.lock() 610 try { 611 awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS) 612 if (streamDeathCause != null) { 613 throw streamDeathCause 614 } 615 } finally { 616 awaitBatchLock.unlock() 617 } 618 } 619 logDebug(s"Unblocked at $newOffset for $source") 620 } 621 622 /** A flag to indicate that a batch has completed with no new data available. */ 623 @volatile private var noNewData = false 624 625 /** 626 * Assert that the await APIs should not be called in the stream thread. Otherwise, it may cause 627 * dead-lock, e.g., calling any await APIs in `StreamingQueryListener.onQueryStarted` will block 628 * the stream thread forever. 629 */ 630 private def assertAwaitThread(): Unit = { 631 if (microBatchThread eq Thread.currentThread) { 632 throw new IllegalStateException( 633 "Cannot wait for a query state from the same thread that is running the query") 634 } 635 } 636 637 /** 638 * Await until all fields of the query have been initialized. 639 */ 640 def awaitInitialization(timeoutMs: Long): Unit = { 641 assertAwaitThread() 642 require(timeoutMs > 0, "Timeout has to be positive") 643 if (streamDeathCause != null) { 644 throw streamDeathCause 645 } 646 initializationLatch.await(timeoutMs, TimeUnit.MILLISECONDS) 647 if (streamDeathCause != null) { 648 throw streamDeathCause 649 } 650 } 651 652 override def processAllAvailable(): Unit = { 653 assertAwaitThread() 654 if (streamDeathCause != null) { 655 throw streamDeathCause 656 } 657 awaitBatchLock.lock() 658 try { 659 noNewData = false 660 while (true) { 661 awaitBatchLockCondition.await(10000, TimeUnit.MILLISECONDS) 662 if (streamDeathCause != null) { 663 throw streamDeathCause 664 } 665 if (noNewData) { 666 return 667 } 668 } 669 } finally { 670 awaitBatchLock.unlock() 671 } 672 } 673 674 override def awaitTermination(): Unit = { 675 assertAwaitThread() 676 terminationLatch.await() 677 if (streamDeathCause != null) { 678 throw streamDeathCause 679 } 680 } 681 682 override def awaitTermination(timeoutMs: Long): Boolean = { 683 assertAwaitThread() 684 require(timeoutMs > 0, "Timeout has to be positive") 685 terminationLatch.await(timeoutMs, TimeUnit.MILLISECONDS) 686 if (streamDeathCause != null) { 687 throw streamDeathCause 688 } else { 689 !isActive 690 } 691 } 692 693 /** Expose for tests */ 694 def explainInternal(extended: Boolean): String = { 695 if (lastExecution == null) { 696 "No physical plan. Waiting for data." 697 } else { 698 val explain = StreamingExplainCommand(lastExecution, extended = extended) 699 sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect() 700 .map(_.getString(0)).mkString("\n") 701 } 702 } 703 704 override def explain(extended: Boolean): Unit = { 705 // scalastyle:off println 706 println(explainInternal(extended)) 707 // scalastyle:on println 708 } 709 710 override def explain(): Unit = explain(extended = false) 711 712 override def toString: String = { 713 s"Streaming Query $prettyIdString [state = $state]" 714 } 715 716 private def toDebugString(includeLogicalPlan: Boolean): String = { 717 val debugString = 718 s"""|=== Streaming Query === 719 |Identifier: $prettyIdString 720 |Current Committed Offsets: $committedOffsets 721 |Current Available Offsets: $availableOffsets 722 | 723 |Current State: $state 724 |Thread State: ${microBatchThread.getState}""".stripMargin 725 if (includeLogicalPlan) { 726 debugString + s"\n\nLogical Plan:\n$logicalPlan" 727 } else { 728 debugString 729 } 730 } 731 732 trait State 733 case object INITIALIZING extends State 734 case object ACTIVE extends State 735 case object TERMINATED extends State 736} 737 738 739/** 740 * A special thread to run the stream query. Some codes require to run in the StreamExecutionThread 741 * and will use `classOf[StreamExecutionThread]` to check. 742 */ 743abstract class StreamExecutionThread(name: String) extends UninterruptibleThread(name) 744