1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.streaming
19
20import java.text.SimpleDateFormat
21import java.util.{Date, TimeZone, UUID}
22
23import scala.collection.mutable
24import scala.collection.JavaConverters._
25
26import org.apache.spark.internal.Logging
27import org.apache.spark.sql.{DataFrame, SparkSession}
28import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
29import org.apache.spark.sql.execution.QueryExecution
30import org.apache.spark.sql.streaming._
31import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
32import org.apache.spark.util.Clock
33
34/**
35 * Responsible for continually reporting statistics about the amount of data processed as well
36 * as latency for a streaming query.  This trait is designed to be mixed into the
37 * [[StreamExecution]], who is responsible for calling `startTrigger` and `finishTrigger`
38 * at the appropriate times. Additionally, the status can updated with `updateStatusMessage` to
39 * allow reporting on the streams current state (i.e. "Fetching more data").
40 */
41trait ProgressReporter extends Logging {
42
43  case class ExecutionStats(
44    inputRows: Map[Source, Long],
45    stateOperators: Seq[StateOperatorProgress],
46    eventTimeStats: Map[String, String])
47
48  // Internal state of the stream, required for computing metrics.
49  protected def id: UUID
50  protected def runId: UUID
51  protected def name: String
52  protected def triggerClock: Clock
53  protected def logicalPlan: LogicalPlan
54  protected def lastExecution: QueryExecution
55  protected def newData: Map[Source, DataFrame]
56  protected def availableOffsets: StreamProgress
57  protected def committedOffsets: StreamProgress
58  protected def sources: Seq[Source]
59  protected def sink: Sink
60  protected def offsetSeqMetadata: OffsetSeqMetadata
61  protected def currentBatchId: Long
62  protected def sparkSession: SparkSession
63  protected def postEvent(event: StreamingQueryListener.Event): Unit
64
65  // Local timestamps and counters.
66  private var currentTriggerStartTimestamp = -1L
67  private var currentTriggerEndTimestamp = -1L
68  // TODO: Restore this from the checkpoint when possible.
69  private var lastTriggerStartTimestamp = -1L
70  private val currentDurationsMs = new mutable.HashMap[String, Long]()
71
72  /** Flag that signals whether any error with input metrics have already been logged */
73  private var metricWarningLogged: Boolean = false
74
75  /** Holds the most recent query progress updates.  Accesses must lock on the queue itself. */
76  private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
77
78  private val noDataProgressEventInterval =
79    sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
80
81  // The timestamp we report an event that has no input data
82  private var lastNoDataProgressEventTime = Long.MinValue
83
84  private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
85  timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
86
87  @volatile
88  protected var currentStatus: StreamingQueryStatus = {
89    new StreamingQueryStatus(
90      message = "Initializing StreamExecution",
91      isDataAvailable = false,
92      isTriggerActive = false)
93  }
94
95  /** Returns the current status of the query. */
96  def status: StreamingQueryStatus = currentStatus
97
98  /** Returns an array containing the most recent query progress updates. */
99  def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized {
100    progressBuffer.toArray
101  }
102
103  /** Returns the most recent query progress update or null if there were no progress updates. */
104  def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
105    progressBuffer.lastOption.orNull
106  }
107
108  /** Begins recording statistics about query progress for a given trigger. */
109  protected def startTrigger(): Unit = {
110    logDebug("Starting Trigger Calculation")
111    lastTriggerStartTimestamp = currentTriggerStartTimestamp
112    currentTriggerStartTimestamp = triggerClock.getTimeMillis()
113    currentStatus = currentStatus.copy(isTriggerActive = true)
114    currentDurationsMs.clear()
115  }
116
117  private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
118    progressBuffer.synchronized {
119      progressBuffer += newProgress
120      while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
121        progressBuffer.dequeue()
122      }
123    }
124    postEvent(new QueryProgressEvent(newProgress))
125    logInfo(s"Streaming query made progress: $newProgress")
126  }
127
128  /** Finalizes the query progress and adds it to list of recent status updates. */
129  protected def finishTrigger(hasNewData: Boolean): Unit = {
130    currentTriggerEndTimestamp = triggerClock.getTimeMillis()
131
132    val executionStats = extractExecutionStats(hasNewData)
133    val processingTimeSec =
134      (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 1000
135
136    val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
137      (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / 1000
138    } else {
139      Double.NaN
140    }
141    logDebug(s"Execution stats: $executionStats")
142
143    val sourceProgress = sources.map { source =>
144      val numRecords = executionStats.inputRows.getOrElse(source, 0L)
145      new SourceProgress(
146        description = source.toString,
147        startOffset = committedOffsets.get(source).map(_.json).orNull,
148        endOffset = availableOffsets.get(source).map(_.json).orNull,
149        numInputRows = numRecords,
150        inputRowsPerSecond = numRecords / inputTimeSec,
151        processedRowsPerSecond = numRecords / processingTimeSec
152      )
153    }
154    val sinkProgress = new SinkProgress(sink.toString)
155
156    val newProgress = new StreamingQueryProgress(
157      id = id,
158      runId = runId,
159      name = name,
160      timestamp = formatTimestamp(currentTriggerStartTimestamp),
161      batchId = currentBatchId,
162      durationMs = new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).asJava),
163      eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
164      stateOperators = executionStats.stateOperators.toArray,
165      sources = sourceProgress.toArray,
166      sink = sinkProgress)
167
168    if (hasNewData) {
169      // Reset noDataEventTimestamp if we processed any data
170      lastNoDataProgressEventTime = Long.MinValue
171      updateProgress(newProgress)
172    } else {
173      val now = triggerClock.getTimeMillis()
174      if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
175        lastNoDataProgressEventTime = now
176        updateProgress(newProgress)
177      }
178    }
179
180    currentStatus = currentStatus.copy(isTriggerActive = false)
181  }
182
183  /** Extract statistics about stateful operators from the executed query plan. */
184  private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
185    if (lastExecution == null) return Nil
186    // lastExecution could belong to one of the previous triggers if `!hasNewData`.
187    // Walking the plan again should be inexpensive.
188    val stateNodes = lastExecution.executedPlan.collect {
189      case p if p.isInstanceOf[StateStoreSaveExec] => p
190    }
191    stateNodes.map { node =>
192      val numRowsUpdated = if (hasNewData) {
193        node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L)
194      } else {
195        0L
196      }
197      new StateOperatorProgress(
198        numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
199        numRowsUpdated = numRowsUpdated)
200    }
201  }
202
203  /** Extracts statistics from the most recent query execution. */
204  private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
205    val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
206    val watermarkTimestamp =
207      if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
208      else Map.empty[String, String]
209
210    // SPARK-19378: Still report metrics even though no data was processed while reporting progress.
211    val stateOperators = extractStateOperatorMetrics(hasNewData)
212
213    if (!hasNewData) {
214      return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
215    }
216
217    // We want to associate execution plan leaves to sources that generate them, so that we match
218    // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
219    // Consider the translation from the streaming logical plan to the final executed plan.
220    //
221    //  streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
222    //
223    // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan
224    //    - Each logical plan leaf will be associated with a single streaming source.
225    //    - There can be multiple logical plan leaves associated with a streaming source.
226    //    - There can be leaves not associated with any streaming source, because they were
227    //      generated from a batch source (e.g. stream-batch joins)
228    //
229    // 2. Assuming that the executed plan has same number of leaves in the same order as that of
230    //    the trigger logical plan, we associate executed plan leaves with corresponding
231    //    streaming sources.
232    //
233    // 3. For each source, we sum the metrics of the associated execution plan leaves.
234    //
235    val logicalPlanLeafToSource = newData.flatMap { case (source, df) =>
236      df.logicalPlan.collectLeaves().map { leaf => leaf -> source }
237    }
238    val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
239    val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
240    val numInputRows: Map[Source, Long] =
241      if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
242        val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
243          case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source }
244        }
245        val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, source) =>
246          val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
247          source -> numRows
248        }
249        sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
250      } else {
251        if (!metricWarningLogged) {
252          def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}"
253          logWarning(
254            "Could not report metrics as number leaves in trigger logical plan did not match that" +
255                s" of the execution plan:\n" +
256                s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" +
257                s"execution plan leaves: ${toString(allExecPlanLeaves)}\n")
258          metricWarningLogged = true
259        }
260        Map.empty
261      }
262
263    val eventTimeStats = lastExecution.executedPlan.collect {
264      case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
265        val stats = e.eventTimeStats.value
266        Map(
267          "max" -> stats.max,
268          "min" -> stats.min,
269          "avg" -> stats.avg).mapValues(formatTimestamp)
270    }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
271
272    ExecutionStats(numInputRows, stateOperators, eventTimeStats)
273  }
274
275  /** Records the duration of running `body` for the next query progress update. */
276  protected def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
277    val startTime = triggerClock.getTimeMillis()
278    val result = body
279    val endTime = triggerClock.getTimeMillis()
280    val timeTaken = math.max(endTime - startTime, 0)
281
282    val previousTime = currentDurationsMs.getOrElse(triggerDetailKey, 0L)
283    currentDurationsMs.put(triggerDetailKey, previousTime + timeTaken)
284    logDebug(s"$triggerDetailKey took $timeTaken ms")
285    result
286  }
287
288  private def formatTimestamp(millis: Long): String = {
289    timestampFormat.format(new Date(millis))
290  }
291
292  /** Updates the message returned in `status`. */
293  protected def updateStatusMessage(message: String): Unit = {
294    currentStatus = currentStatus.copy(message = message)
295  }
296}
297