1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution 19 20import org.apache.spark.rdd.RDD 21import org.apache.spark.sql.{Encoder, Row, SparkSession} 22import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} 23import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation 24import org.apache.spark.sql.catalyst.expressions._ 25import org.apache.spark.sql.catalyst.plans.logical._ 26import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} 27import org.apache.spark.sql.execution.metric.SQLMetrics 28import org.apache.spark.sql.types.DataType 29import org.apache.spark.util.Utils 30 31object RDDConversions { 32 def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { 33 data.mapPartitions { iterator => 34 val numColumns = outputTypes.length 35 val mutableRow = new GenericInternalRow(numColumns) 36 val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) 37 iterator.map { r => 38 var i = 0 39 while (i < numColumns) { 40 mutableRow(i) = converters(i)(r.productElement(i)) 41 i += 1 42 } 43 44 mutableRow 45 } 46 } 47 } 48 49 /** 50 * Convert the objects inside Row into the types Catalyst expected. 51 */ 52 def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = { 53 data.mapPartitions { iterator => 54 val numColumns = outputTypes.length 55 val mutableRow = new GenericInternalRow(numColumns) 56 val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) 57 iterator.map { r => 58 var i = 0 59 while (i < numColumns) { 60 mutableRow(i) = converters(i)(r(i)) 61 i += 1 62 } 63 64 mutableRow 65 } 66 } 67 } 68} 69 70object ExternalRDD { 71 72 def apply[T: Encoder](rdd: RDD[T], session: SparkSession): LogicalPlan = { 73 val externalRdd = ExternalRDD(CatalystSerde.generateObjAttr[T], rdd)(session) 74 CatalystSerde.serialize[T](externalRdd) 75 } 76} 77 78/** Logical plan node for scanning data from an RDD. */ 79case class ExternalRDD[T]( 80 outputObjAttr: Attribute, 81 rdd: RDD[T])(session: SparkSession) 82 extends LeafNode with ObjectProducer with MultiInstanceRelation { 83 84 override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil 85 86 override def newInstance(): ExternalRDD.this.type = 87 ExternalRDD(outputObjAttr.newInstance(), rdd)(session).asInstanceOf[this.type] 88 89 override def sameResult(plan: LogicalPlan): Boolean = { 90 plan.canonicalized match { 91 case ExternalRDD(_, otherRDD) => rdd.id == otherRDD.id 92 case _ => false 93 } 94 } 95 96 override protected def stringArgs: Iterator[Any] = Iterator(output) 97 98 @transient override lazy val statistics: Statistics = Statistics( 99 // TODO: Instead of returning a default value here, find a way to return a meaningful size 100 // estimate for RDDs. See PR 1238 for more discussions. 101 sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes) 102 ) 103} 104 105/** Physical plan node for scanning data from an RDD. */ 106case class ExternalRDDScanExec[T]( 107 outputObjAttr: Attribute, 108 rdd: RDD[T]) extends LeafExecNode with ObjectProducerExec { 109 110 override lazy val metrics = Map( 111 "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) 112 113 protected override def doExecute(): RDD[InternalRow] = { 114 val numOutputRows = longMetric("numOutputRows") 115 val outputDataType = outputObjAttr.dataType 116 rdd.mapPartitionsInternal { iter => 117 val outputObject = ObjectOperator.wrapObjectToRow(outputDataType) 118 iter.map { value => 119 numOutputRows += 1 120 outputObject(value) 121 } 122 } 123 } 124 125 override def simpleString: String = { 126 s"Scan $nodeName${output.mkString("[", ",", "]")}" 127 } 128} 129 130/** Logical plan node for scanning data from an RDD of InternalRow. */ 131case class LogicalRDD( 132 output: Seq[Attribute], 133 rdd: RDD[InternalRow], 134 outputPartitioning: Partitioning = UnknownPartitioning(0), 135 outputOrdering: Seq[SortOrder] = Nil)(session: SparkSession) 136 extends LeafNode with MultiInstanceRelation { 137 138 override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil 139 140 override def newInstance(): LogicalRDD.this.type = { 141 val rewrite = output.zip(output.map(_.newInstance())).toMap 142 143 val rewrittenPartitioning = outputPartitioning match { 144 case p: Expression => 145 p.transform { 146 case e: Attribute => rewrite.getOrElse(e, e) 147 }.asInstanceOf[Partitioning] 148 149 case p => p 150 } 151 152 val rewrittenOrdering = outputOrdering.map(_.transform { 153 case e: Attribute => rewrite.getOrElse(e, e) 154 }.asInstanceOf[SortOrder]) 155 156 LogicalRDD( 157 output.map(rewrite), 158 rdd, 159 rewrittenPartitioning, 160 rewrittenOrdering 161 )(session).asInstanceOf[this.type] 162 } 163 164 override def sameResult(plan: LogicalPlan): Boolean = { 165 plan.canonicalized match { 166 case LogicalRDD(_, otherRDD, _, _) => rdd.id == otherRDD.id 167 case _ => false 168 } 169 } 170 171 override protected def stringArgs: Iterator[Any] = Iterator(output) 172 173 @transient override lazy val statistics: Statistics = Statistics( 174 // TODO: Instead of returning a default value here, find a way to return a meaningful size 175 // estimate for RDDs. See PR 1238 for more discussions. 176 sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes) 177 ) 178} 179 180/** Physical plan node for scanning data from an RDD of InternalRow. */ 181case class RDDScanExec( 182 output: Seq[Attribute], 183 rdd: RDD[InternalRow], 184 override val nodeName: String, 185 override val outputPartitioning: Partitioning = UnknownPartitioning(0), 186 override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode { 187 188 override lazy val metrics = Map( 189 "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) 190 191 protected override def doExecute(): RDD[InternalRow] = { 192 val numOutputRows = longMetric("numOutputRows") 193 rdd.mapPartitionsWithIndexInternal { (index, iter) => 194 val proj = UnsafeProjection.create(schema) 195 proj.initialize(index) 196 iter.map { r => 197 numOutputRows += 1 198 proj(r) 199 } 200 } 201 } 202 203 override def simpleString: String = { 204 s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}" 205 } 206} 207