1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.streaming 19 20import java.text.SimpleDateFormat 21import java.util.{Date, TimeZone, UUID} 22 23import scala.collection.mutable 24import scala.collection.JavaConverters._ 25 26import org.apache.spark.internal.Logging 27import org.apache.spark.sql.{DataFrame, SparkSession} 28import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} 29import org.apache.spark.sql.execution.QueryExecution 30import org.apache.spark.sql.streaming._ 31import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent 32import org.apache.spark.util.Clock 33 34/** 35 * Responsible for continually reporting statistics about the amount of data processed as well 36 * as latency for a streaming query. This trait is designed to be mixed into the 37 * [[StreamExecution]], who is responsible for calling `startTrigger` and `finishTrigger` 38 * at the appropriate times. Additionally, the status can updated with `updateStatusMessage` to 39 * allow reporting on the streams current state (i.e. "Fetching more data"). 40 */ 41trait ProgressReporter extends Logging { 42 43 case class ExecutionStats( 44 inputRows: Map[Source, Long], 45 stateOperators: Seq[StateOperatorProgress], 46 eventTimeStats: Map[String, String]) 47 48 // Internal state of the stream, required for computing metrics. 49 protected def id: UUID 50 protected def runId: UUID 51 protected def name: String 52 protected def triggerClock: Clock 53 protected def logicalPlan: LogicalPlan 54 protected def lastExecution: QueryExecution 55 protected def newData: Map[Source, DataFrame] 56 protected def availableOffsets: StreamProgress 57 protected def committedOffsets: StreamProgress 58 protected def sources: Seq[Source] 59 protected def sink: Sink 60 protected def offsetSeqMetadata: OffsetSeqMetadata 61 protected def currentBatchId: Long 62 protected def sparkSession: SparkSession 63 protected def postEvent(event: StreamingQueryListener.Event): Unit 64 65 // Local timestamps and counters. 66 private var currentTriggerStartTimestamp = -1L 67 private var currentTriggerEndTimestamp = -1L 68 // TODO: Restore this from the checkpoint when possible. 69 private var lastTriggerStartTimestamp = -1L 70 private val currentDurationsMs = new mutable.HashMap[String, Long]() 71 72 /** Flag that signals whether any error with input metrics have already been logged */ 73 private var metricWarningLogged: Boolean = false 74 75 /** Holds the most recent query progress updates. Accesses must lock on the queue itself. */ 76 private val progressBuffer = new mutable.Queue[StreamingQueryProgress]() 77 78 private val noDataProgressEventInterval = 79 sparkSession.sessionState.conf.streamingNoDataProgressEventInterval 80 81 // The timestamp we report an event that has no input data 82 private var lastNoDataProgressEventTime = Long.MinValue 83 84 private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 85 timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC")) 86 87 @volatile 88 protected var currentStatus: StreamingQueryStatus = { 89 new StreamingQueryStatus( 90 message = "Initializing StreamExecution", 91 isDataAvailable = false, 92 isTriggerActive = false) 93 } 94 95 /** Returns the current status of the query. */ 96 def status: StreamingQueryStatus = currentStatus 97 98 /** Returns an array containing the most recent query progress updates. */ 99 def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized { 100 progressBuffer.toArray 101 } 102 103 /** Returns the most recent query progress update or null if there were no progress updates. */ 104 def lastProgress: StreamingQueryProgress = progressBuffer.synchronized { 105 progressBuffer.lastOption.orNull 106 } 107 108 /** Begins recording statistics about query progress for a given trigger. */ 109 protected def startTrigger(): Unit = { 110 logDebug("Starting Trigger Calculation") 111 lastTriggerStartTimestamp = currentTriggerStartTimestamp 112 currentTriggerStartTimestamp = triggerClock.getTimeMillis() 113 currentStatus = currentStatus.copy(isTriggerActive = true) 114 currentDurationsMs.clear() 115 } 116 117 private def updateProgress(newProgress: StreamingQueryProgress): Unit = { 118 progressBuffer.synchronized { 119 progressBuffer += newProgress 120 while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) { 121 progressBuffer.dequeue() 122 } 123 } 124 postEvent(new QueryProgressEvent(newProgress)) 125 logInfo(s"Streaming query made progress: $newProgress") 126 } 127 128 /** Finalizes the query progress and adds it to list of recent status updates. */ 129 protected def finishTrigger(hasNewData: Boolean): Unit = { 130 currentTriggerEndTimestamp = triggerClock.getTimeMillis() 131 132 val executionStats = extractExecutionStats(hasNewData) 133 val processingTimeSec = 134 (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 1000 135 136 val inputTimeSec = if (lastTriggerStartTimestamp >= 0) { 137 (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / 1000 138 } else { 139 Double.NaN 140 } 141 logDebug(s"Execution stats: $executionStats") 142 143 val sourceProgress = sources.map { source => 144 val numRecords = executionStats.inputRows.getOrElse(source, 0L) 145 new SourceProgress( 146 description = source.toString, 147 startOffset = committedOffsets.get(source).map(_.json).orNull, 148 endOffset = availableOffsets.get(source).map(_.json).orNull, 149 numInputRows = numRecords, 150 inputRowsPerSecond = numRecords / inputTimeSec, 151 processedRowsPerSecond = numRecords / processingTimeSec 152 ) 153 } 154 val sinkProgress = new SinkProgress(sink.toString) 155 156 val newProgress = new StreamingQueryProgress( 157 id = id, 158 runId = runId, 159 name = name, 160 timestamp = formatTimestamp(currentTriggerStartTimestamp), 161 batchId = currentBatchId, 162 durationMs = new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).asJava), 163 eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava), 164 stateOperators = executionStats.stateOperators.toArray, 165 sources = sourceProgress.toArray, 166 sink = sinkProgress) 167 168 if (hasNewData) { 169 // Reset noDataEventTimestamp if we processed any data 170 lastNoDataProgressEventTime = Long.MinValue 171 updateProgress(newProgress) 172 } else { 173 val now = triggerClock.getTimeMillis() 174 if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) { 175 lastNoDataProgressEventTime = now 176 updateProgress(newProgress) 177 } 178 } 179 180 currentStatus = currentStatus.copy(isTriggerActive = false) 181 } 182 183 /** Extract statistics about stateful operators from the executed query plan. */ 184 private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = { 185 if (lastExecution == null) return Nil 186 // lastExecution could belong to one of the previous triggers if `!hasNewData`. 187 // Walking the plan again should be inexpensive. 188 val stateNodes = lastExecution.executedPlan.collect { 189 case p if p.isInstanceOf[StateStoreSaveExec] => p 190 } 191 stateNodes.map { node => 192 val numRowsUpdated = if (hasNewData) { 193 node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L) 194 } else { 195 0L 196 } 197 new StateOperatorProgress( 198 numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L), 199 numRowsUpdated = numRowsUpdated) 200 } 201 } 202 203 /** Extracts statistics from the most recent query execution. */ 204 private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { 205 val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty 206 val watermarkTimestamp = 207 if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) 208 else Map.empty[String, String] 209 210 // SPARK-19378: Still report metrics even though no data was processed while reporting progress. 211 val stateOperators = extractStateOperatorMetrics(hasNewData) 212 213 if (!hasNewData) { 214 return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) 215 } 216 217 // We want to associate execution plan leaves to sources that generate them, so that we match 218 // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. 219 // Consider the translation from the streaming logical plan to the final executed plan. 220 // 221 // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan 222 // 223 // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan 224 // - Each logical plan leaf will be associated with a single streaming source. 225 // - There can be multiple logical plan leaves associated with a streaming source. 226 // - There can be leaves not associated with any streaming source, because they were 227 // generated from a batch source (e.g. stream-batch joins) 228 // 229 // 2. Assuming that the executed plan has same number of leaves in the same order as that of 230 // the trigger logical plan, we associate executed plan leaves with corresponding 231 // streaming sources. 232 // 233 // 3. For each source, we sum the metrics of the associated execution plan leaves. 234 // 235 val logicalPlanLeafToSource = newData.flatMap { case (source, df) => 236 df.logicalPlan.collectLeaves().map { leaf => leaf -> source } 237 } 238 val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming 239 val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() 240 val numInputRows: Map[Source, Long] = 241 if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) { 242 val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap { 243 case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source } 244 } 245 val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, source) => 246 val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L) 247 source -> numRows 248 } 249 sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source 250 } else { 251 if (!metricWarningLogged) { 252 def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}" 253 logWarning( 254 "Could not report metrics as number leaves in trigger logical plan did not match that" + 255 s" of the execution plan:\n" + 256 s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" + 257 s"execution plan leaves: ${toString(allExecPlanLeaves)}\n") 258 metricWarningLogged = true 259 } 260 Map.empty 261 } 262 263 val eventTimeStats = lastExecution.executedPlan.collect { 264 case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => 265 val stats = e.eventTimeStats.value 266 Map( 267 "max" -> stats.max, 268 "min" -> stats.min, 269 "avg" -> stats.avg).mapValues(formatTimestamp) 270 }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp 271 272 ExecutionStats(numInputRows, stateOperators, eventTimeStats) 273 } 274 275 /** Records the duration of running `body` for the next query progress update. */ 276 protected def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = { 277 val startTime = triggerClock.getTimeMillis() 278 val result = body 279 val endTime = triggerClock.getTimeMillis() 280 val timeTaken = math.max(endTime - startTime, 0) 281 282 val previousTime = currentDurationsMs.getOrElse(triggerDetailKey, 0L) 283 currentDurationsMs.put(triggerDetailKey, previousTime + timeTaken) 284 logDebug(s"$triggerDetailKey took $timeTaken ms") 285 result 286 } 287 288 private def formatTimestamp(millis: Long): String = { 289 timestampFormat.format(new Date(millis)) 290 } 291 292 /** Updates the message returned in `status`. */ 293 protected def updateStatusMessage(message: String): Unit = { 294 currentStatus = currentStatus.copy(message = message) 295 } 296} 297