1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution
19
20import org.apache.spark.rdd.RDD
21import org.apache.spark.sql.{Encoder, Row, SparkSession}
22import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
23import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
24import org.apache.spark.sql.catalyst.expressions._
25import org.apache.spark.sql.catalyst.plans.logical._
26import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
27import org.apache.spark.sql.execution.metric.SQLMetrics
28import org.apache.spark.sql.types.DataType
29import org.apache.spark.util.Utils
30
31object RDDConversions {
32  def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
33    data.mapPartitions { iterator =>
34      val numColumns = outputTypes.length
35      val mutableRow = new GenericInternalRow(numColumns)
36      val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
37      iterator.map { r =>
38        var i = 0
39        while (i < numColumns) {
40          mutableRow(i) = converters(i)(r.productElement(i))
41          i += 1
42        }
43
44        mutableRow
45      }
46    }
47  }
48
49  /**
50   * Convert the objects inside Row into the types Catalyst expected.
51   */
52  def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = {
53    data.mapPartitions { iterator =>
54      val numColumns = outputTypes.length
55      val mutableRow = new GenericInternalRow(numColumns)
56      val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
57      iterator.map { r =>
58        var i = 0
59        while (i < numColumns) {
60          mutableRow(i) = converters(i)(r(i))
61          i += 1
62        }
63
64        mutableRow
65      }
66    }
67  }
68}
69
70object ExternalRDD {
71
72  def apply[T: Encoder](rdd: RDD[T], session: SparkSession): LogicalPlan = {
73    val externalRdd = ExternalRDD(CatalystSerde.generateObjAttr[T], rdd)(session)
74    CatalystSerde.serialize[T](externalRdd)
75  }
76}
77
78/** Logical plan node for scanning data from an RDD. */
79case class ExternalRDD[T](
80    outputObjAttr: Attribute,
81    rdd: RDD[T])(session: SparkSession)
82  extends LeafNode with ObjectProducer with MultiInstanceRelation {
83
84  override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil
85
86  override def newInstance(): ExternalRDD.this.type =
87    ExternalRDD(outputObjAttr.newInstance(), rdd)(session).asInstanceOf[this.type]
88
89  override def sameResult(plan: LogicalPlan): Boolean = {
90    plan.canonicalized match {
91      case ExternalRDD(_, otherRDD) => rdd.id == otherRDD.id
92      case _ => false
93    }
94  }
95
96  override protected def stringArgs: Iterator[Any] = Iterator(output)
97
98  @transient override lazy val statistics: Statistics = Statistics(
99    // TODO: Instead of returning a default value here, find a way to return a meaningful size
100    // estimate for RDDs. See PR 1238 for more discussions.
101    sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
102  )
103}
104
105/** Physical plan node for scanning data from an RDD. */
106case class ExternalRDDScanExec[T](
107    outputObjAttr: Attribute,
108    rdd: RDD[T]) extends LeafExecNode with ObjectProducerExec {
109
110  override lazy val metrics = Map(
111    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
112
113  protected override def doExecute(): RDD[InternalRow] = {
114    val numOutputRows = longMetric("numOutputRows")
115    val outputDataType = outputObjAttr.dataType
116    rdd.mapPartitionsInternal { iter =>
117      val outputObject = ObjectOperator.wrapObjectToRow(outputDataType)
118      iter.map { value =>
119        numOutputRows += 1
120        outputObject(value)
121      }
122    }
123  }
124
125  override def simpleString: String = {
126    s"Scan $nodeName${output.mkString("[", ",", "]")}"
127  }
128}
129
130/** Logical plan node for scanning data from an RDD of InternalRow. */
131case class LogicalRDD(
132    output: Seq[Attribute],
133    rdd: RDD[InternalRow],
134    outputPartitioning: Partitioning = UnknownPartitioning(0),
135    outputOrdering: Seq[SortOrder] = Nil)(session: SparkSession)
136  extends LeafNode with MultiInstanceRelation {
137
138  override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil
139
140  override def newInstance(): LogicalRDD.this.type = {
141    val rewrite = output.zip(output.map(_.newInstance())).toMap
142
143    val rewrittenPartitioning = outputPartitioning match {
144      case p: Expression =>
145        p.transform {
146          case e: Attribute => rewrite.getOrElse(e, e)
147        }.asInstanceOf[Partitioning]
148
149      case p => p
150    }
151
152    val rewrittenOrdering = outputOrdering.map(_.transform {
153      case e: Attribute => rewrite.getOrElse(e, e)
154    }.asInstanceOf[SortOrder])
155
156    LogicalRDD(
157      output.map(rewrite),
158      rdd,
159      rewrittenPartitioning,
160      rewrittenOrdering
161    )(session).asInstanceOf[this.type]
162  }
163
164  override def sameResult(plan: LogicalPlan): Boolean = {
165    plan.canonicalized match {
166      case LogicalRDD(_, otherRDD, _, _) => rdd.id == otherRDD.id
167      case _ => false
168    }
169  }
170
171  override protected def stringArgs: Iterator[Any] = Iterator(output)
172
173  @transient override lazy val statistics: Statistics = Statistics(
174    // TODO: Instead of returning a default value here, find a way to return a meaningful size
175    // estimate for RDDs. See PR 1238 for more discussions.
176    sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
177  )
178}
179
180/** Physical plan node for scanning data from an RDD of InternalRow. */
181case class RDDScanExec(
182    output: Seq[Attribute],
183    rdd: RDD[InternalRow],
184    override val nodeName: String,
185    override val outputPartitioning: Partitioning = UnknownPartitioning(0),
186    override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode {
187
188  override lazy val metrics = Map(
189    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
190
191  protected override def doExecute(): RDD[InternalRow] = {
192    val numOutputRows = longMetric("numOutputRows")
193    rdd.mapPartitionsWithIndexInternal { (index, iter) =>
194      val proj = UnsafeProjection.create(schema)
195      proj.initialize(index)
196      iter.map { r =>
197        numOutputRows += 1
198        proj(r)
199      }
200    }
201  }
202
203  override def simpleString: String = {
204    s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}"
205  }
206}
207