1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.catalyst.trees
19
20import java.util.UUID
21
22import scala.collection.Map
23import scala.reflect.ClassTag
24
25import org.apache.commons.lang3.ClassUtils
26import org.json4s.JsonAST._
27import org.json4s.JsonDSL._
28import org.json4s.jackson.JsonMethods._
29
30import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource}
31import org.apache.spark.sql.catalyst.FunctionIdentifier
32import org.apache.spark.sql.catalyst.ScalaReflection._
33import org.apache.spark.sql.catalyst.TableIdentifier
34import org.apache.spark.sql.catalyst.errors._
35import org.apache.spark.sql.catalyst.expressions._
36import org.apache.spark.sql.catalyst.plans.JoinType
37import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
38import org.apache.spark.sql.types._
39import org.apache.spark.storage.StorageLevel
40import org.apache.spark.util.Utils
41
42/** Used by [[TreeNode.getNodeNumbered]] when traversing the tree for a given number */
43private class MutableInt(var i: Int)
44
45case class Origin(
46  line: Option[Int] = None,
47  startPosition: Option[Int] = None)
48
49/**
50 * Provides a location for TreeNodes to ask about the context of their origin.  For example, which
51 * line of code is currently being parsed.
52 */
53object CurrentOrigin {
54  private val value = new ThreadLocal[Origin]() {
55    override def initialValue: Origin = Origin()
56  }
57
58  def get: Origin = value.get()
59  def set(o: Origin): Unit = value.set(o)
60
61  def reset(): Unit = value.set(Origin())
62
63  def setPosition(line: Int, start: Int): Unit = {
64    value.set(
65      value.get.copy(line = Some(line), startPosition = Some(start)))
66  }
67
68  def withOrigin[A](o: Origin)(f: => A): A = {
69    set(o)
70    val ret = try f finally { reset() }
71    reset()
72    ret
73  }
74}
75
76// scalastyle:off
77abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
78// scalastyle:on
79  self: BaseType =>
80
81  val origin: Origin = CurrentOrigin.get
82
83  /**
84   * Returns a Seq of the children of this node.
85   * Children should not change. Immutability required for containsChild optimization
86   */
87  def children: Seq[BaseType]
88
89  lazy val containsChild: Set[TreeNode[_]] = children.toSet
90
91  private lazy val _hashCode: Int = scala.util.hashing.MurmurHash3.productHash(this)
92  override def hashCode(): Int = _hashCode
93
94  /**
95   * Faster version of equality which short-circuits when two treeNodes are the same instance.
96   * We don't just override Object.equals, as doing so prevents the scala compiler from
97   * generating case class `equals` methods
98   */
99  def fastEquals(other: TreeNode[_]): Boolean = {
100    this.eq(other) || this == other
101  }
102
103  /**
104   * Find the first [[TreeNode]] that satisfies the condition specified by `f`.
105   * The condition is recursively applied to this node and all of its children (pre-order).
106   */
107  def find(f: BaseType => Boolean): Option[BaseType] = if (f(this)) {
108    Some(this)
109  } else {
110    children.foldLeft(Option.empty[BaseType]) { (l, r) => l.orElse(r.find(f)) }
111  }
112
113  /**
114   * Runs the given function on this node and then recursively on [[children]].
115   * @param f the function to be applied to each node in the tree.
116   */
117  def foreach(f: BaseType => Unit): Unit = {
118    f(this)
119    children.foreach(_.foreach(f))
120  }
121
122  /**
123   * Runs the given function recursively on [[children]] then on this node.
124   * @param f the function to be applied to each node in the tree.
125   */
126  def foreachUp(f: BaseType => Unit): Unit = {
127    children.foreach(_.foreachUp(f))
128    f(this)
129  }
130
131  /**
132   * Returns a Seq containing the result of applying the given function to each
133   * node in this tree in a preorder traversal.
134   * @param f the function to be applied.
135   */
136  def map[A](f: BaseType => A): Seq[A] = {
137    val ret = new collection.mutable.ArrayBuffer[A]()
138    foreach(ret += f(_))
139    ret
140  }
141
142  /**
143   * Returns a Seq by applying a function to all nodes in this tree and using the elements of the
144   * resulting collections.
145   */
146  def flatMap[A](f: BaseType => TraversableOnce[A]): Seq[A] = {
147    val ret = new collection.mutable.ArrayBuffer[A]()
148    foreach(ret ++= f(_))
149    ret
150  }
151
152  /**
153   * Returns a Seq containing the result of applying a partial function to all elements in this
154   * tree on which the function is defined.
155   */
156  def collect[B](pf: PartialFunction[BaseType, B]): Seq[B] = {
157    val ret = new collection.mutable.ArrayBuffer[B]()
158    val lifted = pf.lift
159    foreach(node => lifted(node).foreach(ret.+=))
160    ret
161  }
162
163  /**
164   * Returns a Seq containing the leaves in this tree.
165   */
166  def collectLeaves(): Seq[BaseType] = {
167    this.collect { case p if p.children.isEmpty => p }
168  }
169
170  /**
171   * Finds and returns the first [[TreeNode]] of the tree for which the given partial function
172   * is defined (pre-order), and applies the partial function to it.
173   */
174  def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] = {
175    val lifted = pf.lift
176    lifted(this).orElse {
177      children.foldLeft(Option.empty[B]) { (l, r) => l.orElse(r.collectFirst(pf)) }
178    }
179  }
180
181  /**
182   * Efficient alternative to `productIterator.map(f).toArray`.
183   */
184  protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {
185    val arr = Array.ofDim[B](productArity)
186    var i = 0
187    while (i < arr.length) {
188      arr(i) = f(productElement(i))
189      i += 1
190    }
191    arr
192  }
193
194  /**
195   * Returns a copy of this node with the children replaced.
196   * TODO: Validate somewhere (in debug mode?) that children are ordered correctly.
197   */
198  def withNewChildren(newChildren: Seq[BaseType]): BaseType = {
199    assert(newChildren.size == children.size, "Incorrect number of children")
200    var changed = false
201    val remainingNewChildren = newChildren.toBuffer
202    val remainingOldChildren = children.toBuffer
203    val newArgs = mapProductIterator {
204      case s: StructType => s // Don't convert struct types to some other type of Seq[StructField]
205      // Handle Seq[TreeNode] in TreeNode parameters.
206      case s: Seq[_] => s.map {
207        case arg: TreeNode[_] if containsChild(arg) =>
208          val newChild = remainingNewChildren.remove(0)
209          val oldChild = remainingOldChildren.remove(0)
210          if (newChild fastEquals oldChild) {
211            oldChild
212          } else {
213            changed = true
214            newChild
215          }
216        case nonChild: AnyRef => nonChild
217        case null => null
218      }
219      case m: Map[_, _] => m.mapValues {
220        case arg: TreeNode[_] if containsChild(arg) =>
221          val newChild = remainingNewChildren.remove(0)
222          val oldChild = remainingOldChildren.remove(0)
223          if (newChild fastEquals oldChild) {
224            oldChild
225          } else {
226            changed = true
227            newChild
228          }
229        case nonChild: AnyRef => nonChild
230        case null => null
231      }.view.force // `mapValues` is lazy and we need to force it to materialize
232      case arg: TreeNode[_] if containsChild(arg) =>
233        val newChild = remainingNewChildren.remove(0)
234        val oldChild = remainingOldChildren.remove(0)
235        if (newChild fastEquals oldChild) {
236          oldChild
237        } else {
238          changed = true
239          newChild
240        }
241      case nonChild: AnyRef => nonChild
242      case null => null
243    }
244
245    if (changed) makeCopy(newArgs) else this
246  }
247
248  /**
249   * Returns a copy of this node where `rule` has been recursively applied to the tree.
250   * When `rule` does not apply to a given node it is left unchanged.
251   * Users should not expect a specific directionality. If a specific directionality is needed,
252   * transformDown or transformUp should be used.
253   *
254   * @param rule the function use to transform this nodes children
255   */
256  def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = {
257    transformDown(rule)
258  }
259
260  /**
261   * Returns a copy of this node where `rule` has been recursively applied to it and all of its
262   * children (pre-order). When `rule` does not apply to a given node it is left unchanged.
263   *
264   * @param rule the function used to transform this nodes children
265   */
266  def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
267    val afterRule = CurrentOrigin.withOrigin(origin) {
268      rule.applyOrElse(this, identity[BaseType])
269    }
270
271    // Check if unchanged and then possibly return old copy to avoid gc churn.
272    if (this fastEquals afterRule) {
273      mapChildren(_.transformDown(rule))
274    } else {
275      afterRule.mapChildren(_.transformDown(rule))
276    }
277  }
278
279  /**
280   * Returns a copy of this node where `rule` has been recursively applied first to all of its
281   * children and then itself (post-order). When `rule` does not apply to a given node, it is left
282   * unchanged.
283   *
284   * @param rule the function use to transform this nodes children
285   */
286  def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {
287    val afterRuleOnChildren = mapChildren(_.transformUp(rule))
288    if (this fastEquals afterRuleOnChildren) {
289      CurrentOrigin.withOrigin(origin) {
290        rule.applyOrElse(this, identity[BaseType])
291      }
292    } else {
293      CurrentOrigin.withOrigin(origin) {
294        rule.applyOrElse(afterRuleOnChildren, identity[BaseType])
295      }
296    }
297  }
298
299  /**
300   * Returns a copy of this node where `f` has been applied to all the nodes children.
301   */
302  def mapChildren(f: BaseType => BaseType): BaseType = {
303    if (children.nonEmpty) {
304      var changed = false
305      val newArgs = mapProductIterator {
306        case arg: TreeNode[_] if containsChild(arg) =>
307          val newChild = f(arg.asInstanceOf[BaseType])
308          if (!(newChild fastEquals arg)) {
309            changed = true
310            newChild
311          } else {
312            arg
313          }
314        case Some(arg: TreeNode[_]) if containsChild(arg) =>
315          val newChild = f(arg.asInstanceOf[BaseType])
316          if (!(newChild fastEquals arg)) {
317            changed = true
318            Some(newChild)
319          } else {
320            Some(arg)
321          }
322        case m: Map[_, _] => m.mapValues {
323          case arg: TreeNode[_] if containsChild(arg) =>
324            val newChild = f(arg.asInstanceOf[BaseType])
325            if (!(newChild fastEquals arg)) {
326              changed = true
327              newChild
328            } else {
329              arg
330            }
331          case other => other
332        }.view.force // `mapValues` is lazy and we need to force it to materialize
333        case d: DataType => d // Avoid unpacking Structs
334        case args: Traversable[_] => args.map {
335          case arg: TreeNode[_] if containsChild(arg) =>
336            val newChild = f(arg.asInstanceOf[BaseType])
337            if (!(newChild fastEquals arg)) {
338              changed = true
339              newChild
340            } else {
341              arg
342            }
343          case tuple@(arg1: TreeNode[_], arg2: TreeNode[_]) =>
344            val newChild1 = f(arg1.asInstanceOf[BaseType])
345            val newChild2 = f(arg2.asInstanceOf[BaseType])
346            if (!(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2)) {
347              changed = true
348              (newChild1, newChild2)
349            } else {
350              tuple
351            }
352          case other => other
353        }
354        case nonChild: AnyRef => nonChild
355        case null => null
356      }
357      if (changed) makeCopy(newArgs) else this
358    } else {
359      this
360    }
361  }
362
363  /**
364   * Args to the constructor that should be copied, but not transformed.
365   * These are appended to the transformed args automatically by makeCopy
366   * @return
367   */
368  protected def otherCopyArgs: Seq[AnyRef] = Nil
369
370  /**
371   * Creates a copy of this type of tree node after a transformation.
372   * Must be overridden by child classes that have constructor arguments
373   * that are not present in the productIterator.
374   * @param newArgs the new product arguments.
375   */
376  def makeCopy(newArgs: Array[AnyRef]): BaseType = attachTree(this, "makeCopy") {
377    // Skip no-arg constructors that are just there for kryo.
378    val ctors = getClass.getConstructors.filter(_.getParameterTypes.size != 0)
379    if (ctors.isEmpty) {
380      sys.error(s"No valid constructor for $nodeName")
381    }
382    val allArgs: Array[AnyRef] = if (otherCopyArgs.isEmpty) {
383      newArgs
384    } else {
385      newArgs ++ otherCopyArgs
386    }
387    val defaultCtor = ctors.find { ctor =>
388      if (ctor.getParameterTypes.length != allArgs.length) {
389        false
390      } else if (allArgs.contains(null)) {
391        // if there is a `null`, we can't figure out the class, therefore we should just fallback
392        // to older heuristic
393        false
394      } else {
395        val argsArray: Array[Class[_]] = allArgs.map(_.getClass)
396        ClassUtils.isAssignable(argsArray, ctor.getParameterTypes, true /* autoboxing */)
397      }
398    }.getOrElse(ctors.maxBy(_.getParameterTypes.length)) // fall back to older heuristic
399
400    try {
401      CurrentOrigin.withOrigin(origin) {
402        defaultCtor.newInstance(allArgs.toArray: _*).asInstanceOf[BaseType]
403      }
404    } catch {
405      case e: java.lang.IllegalArgumentException =>
406        throw new TreeNodeException(
407          this,
408          s"""
409             |Failed to copy node.
410             |Is otherCopyArgs specified correctly for $nodeName.
411             |Exception message: ${e.getMessage}
412             |ctor: $defaultCtor?
413             |types: ${newArgs.map(_.getClass).mkString(", ")}
414             |args: ${newArgs.mkString(", ")}
415           """.stripMargin)
416    }
417  }
418
419  /**
420   * Returns the name of this type of TreeNode.  Defaults to the class name.
421   * Note that we remove the "Exec" suffix for physical operators here.
422   */
423  def nodeName: String = getClass.getSimpleName.replaceAll("Exec$", "")
424
425  /**
426   * The arguments that should be included in the arg string.  Defaults to the `productIterator`.
427   */
428  protected def stringArgs: Iterator[Any] = productIterator
429
430  private lazy val allChildren: Set[TreeNode[_]] = (children ++ innerChildren).toSet[TreeNode[_]]
431
432  /** Returns a string representing the arguments to this node, minus any children */
433  def argString: String = stringArgs.flatMap {
434    case tn: TreeNode[_] if allChildren.contains(tn) => Nil
435    case Some(tn: TreeNode[_]) if allChildren.contains(tn) => Nil
436    case Some(tn: TreeNode[_]) => tn.simpleString :: Nil
437    case tn: TreeNode[_] => tn.simpleString :: Nil
438    case seq: Seq[Any] if seq.toSet.subsetOf(allChildren.asInstanceOf[Set[Any]]) => Nil
439    case iter: Iterable[_] if iter.isEmpty => Nil
440    case seq: Seq[_] => Utils.truncatedString(seq, "[", ", ", "]") :: Nil
441    case set: Set[_] => Utils.truncatedString(set.toSeq, "{", ", ", "}") :: Nil
442    case array: Array[_] if array.isEmpty => Nil
443    case array: Array[_] => Utils.truncatedString(array, "[", ", ", "]") :: Nil
444    case null => Nil
445    case None => Nil
446    case Some(null) => Nil
447    case Some(any) => any :: Nil
448    case other => other :: Nil
449  }.mkString(", ")
450
451  /** ONE line description of this node. */
452  def simpleString: String = s"$nodeName $argString".trim
453
454  /** ONE line description of this node with more information */
455  def verboseString: String
456
457  override def toString: String = treeString
458
459  /** Returns a string representation of the nodes in this tree */
460  def treeString: String = treeString(verbose = true)
461
462  def treeString(verbose: Boolean): String = {
463    generateTreeString(0, Nil, new StringBuilder, verbose).toString
464  }
465
466  /**
467   * Returns a string representation of the nodes in this tree, where each operator is numbered.
468   * The numbers can be used with [[TreeNode.apply]] to easily access specific subtrees.
469   *
470   * The numbers are based on depth-first traversal of the tree (with innerChildren traversed first
471   * before children).
472   */
473  def numberedTreeString: String =
474    treeString.split("\n").zipWithIndex.map { case (line, i) => f"$i%02d $line" }.mkString("\n")
475
476  /**
477   * Returns the tree node at the specified number, used primarily for interactive debugging.
478   * Numbers for each node can be found in the [[numberedTreeString]].
479   *
480   * Note that this cannot return BaseType because logical plan's plan node might return
481   * physical plan for innerChildren, e.g. in-memory relation logical plan node has a reference
482   * to the physical plan node it is referencing.
483   */
484  def apply(number: Int): TreeNode[_] = getNodeNumbered(new MutableInt(number)).orNull
485
486  /**
487   * Returns the tree node at the specified number, used primarily for interactive debugging.
488   * Numbers for each node can be found in the [[numberedTreeString]].
489   *
490   * This is a variant of [[apply]] that returns the node as BaseType (if the type matches).
491   */
492  def p(number: Int): BaseType = apply(number).asInstanceOf[BaseType]
493
494  private def getNodeNumbered(number: MutableInt): Option[TreeNode[_]] = {
495    if (number.i < 0) {
496      None
497    } else if (number.i == 0) {
498      Some(this)
499    } else {
500      number.i -= 1
501      // Note that this traversal order must be the same as numberedTreeString.
502      innerChildren.map(_.getNodeNumbered(number)).find(_ != None).getOrElse {
503        children.map(_.getNodeNumbered(number)).find(_ != None).flatten
504      }
505    }
506  }
507
508  /**
509   * All the nodes that should be shown as a inner nested tree of this node.
510   * For example, this can be used to show sub-queries.
511   */
512  protected def innerChildren: Seq[TreeNode[_]] = Seq.empty
513
514  /**
515   * Appends the string represent of this node and its children to the given StringBuilder.
516   *
517   * The `i`-th element in `lastChildren` indicates whether the ancestor of the current node at
518   * depth `i + 1` is the last child of its own parent node.  The depth of the root node is 0, and
519   * `lastChildren` for the root node should be empty.
520   *
521   * Note that this traversal (numbering) order must be the same as [[getNodeNumbered]].
522   */
523  def generateTreeString(
524      depth: Int,
525      lastChildren: Seq[Boolean],
526      builder: StringBuilder,
527      verbose: Boolean,
528      prefix: String = ""): StringBuilder = {
529
530    if (depth > 0) {
531      lastChildren.init.foreach { isLast =>
532        builder.append(if (isLast) "   " else ":  ")
533      }
534      builder.append(if (lastChildren.last) "+- " else ":- ")
535    }
536
537    builder.append(prefix)
538    builder.append(if (verbose) verboseString else simpleString)
539    builder.append("\n")
540
541    if (innerChildren.nonEmpty) {
542      innerChildren.init.foreach(_.generateTreeString(
543        depth + 2, lastChildren :+ children.isEmpty :+ false, builder, verbose))
544      innerChildren.last.generateTreeString(
545        depth + 2, lastChildren :+ children.isEmpty :+ true, builder, verbose)
546    }
547
548    if (children.nonEmpty) {
549      children.init.foreach(_.generateTreeString(
550        depth + 1, lastChildren :+ false, builder, verbose, prefix))
551      children.last.generateTreeString(
552        depth + 1, lastChildren :+ true, builder, verbose, prefix)
553    }
554
555    builder
556  }
557
558  /**
559   * Returns a 'scala code' representation of this `TreeNode` and its children.  Intended for use
560   * when debugging where the prettier toString function is obfuscating the actual structure. In the
561   * case of 'pure' `TreeNodes` that only contain primitives and other TreeNodes, the result can be
562   * pasted in the REPL to build an equivalent Tree.
563   */
564  def asCode: String = {
565    val args = productIterator.map {
566      case tn: TreeNode[_] => tn.asCode
567      case s: String => "\"" + s + "\""
568      case other => other.toString
569    }
570    s"$nodeName(${args.mkString(",")})"
571  }
572
573  def toJSON: String = compact(render(jsonValue))
574
575  def prettyJson: String = pretty(render(jsonValue))
576
577  private def jsonValue: JValue = {
578    val jsonValues = scala.collection.mutable.ArrayBuffer.empty[JValue]
579
580    def collectJsonValue(tn: BaseType): Unit = {
581      val jsonFields = ("class" -> JString(tn.getClass.getName)) ::
582        ("num-children" -> JInt(tn.children.length)) :: tn.jsonFields
583      jsonValues += JObject(jsonFields)
584      tn.children.foreach(collectJsonValue)
585    }
586
587    collectJsonValue(this)
588    jsonValues
589  }
590
591  protected def jsonFields: List[JField] = {
592    val fieldNames = getConstructorParameterNames(getClass)
593    val fieldValues = productIterator.toSeq ++ otherCopyArgs
594    assert(fieldNames.length == fieldValues.length, s"${getClass.getSimpleName} fields: " +
595      fieldNames.mkString(", ") + s", values: " + fieldValues.map(_.toString).mkString(", "))
596
597    fieldNames.zip(fieldValues).map {
598      // If the field value is a child, then use an int to encode it, represents the index of
599      // this child in all children.
600      case (name, value: TreeNode[_]) if containsChild(value) =>
601        name -> JInt(children.indexOf(value))
602      case (name, value: Seq[BaseType]) if value.forall(containsChild) =>
603        name -> JArray(
604          value.map(v => JInt(children.indexOf(v.asInstanceOf[TreeNode[_]]))).toList
605        )
606      case (name, value) => name -> parseToJson(value)
607    }.toList
608  }
609
610  private def parseToJson(obj: Any): JValue = obj match {
611    case b: Boolean => JBool(b)
612    case b: Byte => JInt(b.toInt)
613    case s: Short => JInt(s.toInt)
614    case i: Int => JInt(i)
615    case l: Long => JInt(l)
616    case f: Float => JDouble(f)
617    case d: Double => JDouble(d)
618    case b: BigInt => JInt(b)
619    case null => JNull
620    case s: String => JString(s)
621    case u: UUID => JString(u.toString)
622    case dt: DataType => dt.jsonValue
623    // SPARK-17356: In usage of mllib, Metadata may store a huge vector of data, transforming
624    // it to JSON may trigger OutOfMemoryError.
625    case m: Metadata => Metadata.empty.jsonValue
626    case clazz: Class[_] => JString(clazz.getName)
627    case s: StorageLevel =>
628      ("useDisk" -> s.useDisk) ~ ("useMemory" -> s.useMemory) ~ ("useOffHeap" -> s.useOffHeap) ~
629        ("deserialized" -> s.deserialized) ~ ("replication" -> s.replication)
630    case n: TreeNode[_] => n.jsonValue
631    case o: Option[_] => o.map(parseToJson)
632    // Recursive scan Seq[TreeNode], Seq[Partitioning], Seq[DataType]
633    case t: Seq[_] if t.forall(_.isInstanceOf[TreeNode[_]]) ||
634      t.forall(_.isInstanceOf[Partitioning]) || t.forall(_.isInstanceOf[DataType]) =>
635      JArray(t.map(parseToJson).toList)
636    case t: Seq[_] if t.length > 0 && t.head.isInstanceOf[String] =>
637      JString(Utils.truncatedString(t, "[", ", ", "]"))
638    case t: Seq[_] => JNull
639    case m: Map[_, _] => JNull
640    // if it's a scala object, we can simply keep the full class path.
641    // TODO: currently if the class name ends with "$", we think it's a scala object, there is
642    // probably a better way to check it.
643    case obj if obj.getClass.getName.endsWith("$") => "object" -> obj.getClass.getName
644    case p: Product if shouldConvertToJson(p) =>
645      try {
646        val fieldNames = getConstructorParameterNames(p.getClass)
647        val fieldValues = p.productIterator.toSeq
648        assert(fieldNames.length == fieldValues.length)
649        ("product-class" -> JString(p.getClass.getName)) :: fieldNames.zip(fieldValues).map {
650          case (name, value) => name -> parseToJson(value)
651        }.toList
652      } catch {
653        case _: RuntimeException => null
654      }
655    case _ => JNull
656  }
657
658  private def shouldConvertToJson(product: Product): Boolean = product match {
659    case exprId: ExprId => true
660    case field: StructField => true
661    case id: TableIdentifier => true
662    case join: JoinType => true
663    case id: FunctionIdentifier => true
664    case spec: BucketSpec => true
665    case catalog: CatalogTable => true
666    case boundary: FrameBoundary => true
667    case frame: WindowFrame => true
668    case partition: Partitioning => true
669    case resource: FunctionResource => true
670    case broadcast: BroadcastMode => true
671    case table: CatalogTableType => true
672    case storage: CatalogStorageFormat => true
673    case _ => false
674  }
675}
676