1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.ml.linalg
19
20import java.lang.{Double => JavaDouble, Integer => JavaInteger, Iterable => JavaIterable}
21import java.util
22
23import scala.annotation.varargs
24import scala.collection.JavaConverters._
25
26import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
27
28import org.apache.spark.annotation.Since
29
30/**
31 * Represents a numeric vector, whose index type is Int and value type is Double.
32 *
33 * @note Users should not implement this interface.
34 */
35@Since("2.0.0")
36sealed trait Vector extends Serializable {
37
38  /**
39   * Size of the vector.
40   */
41  @Since("2.0.0")
42  def size: Int
43
44  /**
45   * Converts the instance to a double array.
46   */
47  @Since("2.0.0")
48  def toArray: Array[Double]
49
50  override def equals(other: Any): Boolean = {
51    other match {
52      case v2: Vector =>
53        if (this.size != v2.size) return false
54        (this, v2) match {
55          case (s1: SparseVector, s2: SparseVector) =>
56            Vectors.equals(s1.indices, s1.values, s2.indices, s2.values)
57          case (s1: SparseVector, d1: DenseVector) =>
58            Vectors.equals(s1.indices, s1.values, 0 until d1.size, d1.values)
59          case (d1: DenseVector, s1: SparseVector) =>
60            Vectors.equals(0 until d1.size, d1.values, s1.indices, s1.values)
61          case (_, _) => util.Arrays.equals(this.toArray, v2.toArray)
62        }
63      case _ => false
64    }
65  }
66
67  /**
68   * Returns a hash code value for the vector. The hash code is based on its size and its first 128
69   * nonzero entries, using a hash algorithm similar to `java.util.Arrays.hashCode`.
70   */
71  override def hashCode(): Int = {
72    // This is a reference implementation. It calls return in foreachActive, which is slow.
73    // Subclasses should override it with optimized implementation.
74    var result: Int = 31 + size
75    var nnz = 0
76    this.foreachActive { (index, value) =>
77      if (nnz < Vectors.MAX_HASH_NNZ) {
78        // ignore explicit 0 for comparison between sparse and dense
79        if (value != 0) {
80          result = 31 * result + index
81          val bits = java.lang.Double.doubleToLongBits(value)
82          result = 31 * result + (bits ^ (bits >>> 32)).toInt
83          nnz += 1
84        }
85      } else {
86        return result
87      }
88    }
89    result
90  }
91
92  /**
93   * Converts the instance to a breeze vector.
94   */
95  private[spark] def asBreeze: BV[Double]
96
97  /**
98   * Gets the value of the ith element.
99   * @param i index
100   */
101  @Since("2.0.0")
102  def apply(i: Int): Double = asBreeze(i)
103
104  /**
105   * Makes a deep copy of this vector.
106   */
107  @Since("2.0.0")
108  def copy: Vector = {
109    throw new NotImplementedError(s"copy is not implemented for ${this.getClass}.")
110  }
111
112  /**
113   * Applies a function `f` to all the active elements of dense and sparse vector.
114   *
115   * @param f the function takes two parameters where the first parameter is the index of
116   *          the vector with type `Int`, and the second parameter is the corresponding value
117   *          with type `Double`.
118   */
119  @Since("2.0.0")
120  def foreachActive(f: (Int, Double) => Unit): Unit
121
122  /**
123   * Number of active entries.  An "active entry" is an element which is explicitly stored,
124   * regardless of its value.  Note that inactive entries have value 0.
125   */
126  @Since("2.0.0")
127  def numActives: Int
128
129  /**
130   * Number of nonzero elements. This scans all active values and count nonzeros.
131   */
132  @Since("2.0.0")
133  def numNonzeros: Int
134
135  /**
136   * Converts this vector to a sparse vector with all explicit zeros removed.
137   */
138  @Since("2.0.0")
139  def toSparse: SparseVector
140
141  /**
142   * Converts this vector to a dense vector.
143   */
144  @Since("2.0.0")
145  def toDense: DenseVector = new DenseVector(this.toArray)
146
147  /**
148   * Returns a vector in either dense or sparse format, whichever uses less storage.
149   */
150  @Since("2.0.0")
151  def compressed: Vector = {
152    val nnz = numNonzeros
153    // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs 12 * nnz + 20 bytes.
154    if (1.5 * (nnz + 1.0) < size) {
155      toSparse
156    } else {
157      toDense
158    }
159  }
160
161  /**
162   * Find the index of a maximal element.  Returns the first maximal element in case of a tie.
163   * Returns -1 if vector has length 0.
164   */
165  @Since("2.0.0")
166  def argmax: Int
167}
168
169/**
170 * Factory methods for [[org.apache.spark.ml.linalg.Vector]].
171 * We don't use the name `Vector` because Scala imports
172 * [[scala.collection.immutable.Vector]] by default.
173 */
174@Since("2.0.0")
175object Vectors {
176
177  /**
178   * Creates a dense vector from its values.
179   */
180  @varargs
181  @Since("2.0.0")
182  def dense(firstValue: Double, otherValues: Double*): Vector =
183    new DenseVector((firstValue +: otherValues).toArray)
184
185  // A dummy implicit is used to avoid signature collision with the one generated by @varargs.
186  /**
187   * Creates a dense vector from a double array.
188   */
189  @Since("2.0.0")
190  def dense(values: Array[Double]): Vector = new DenseVector(values)
191
192  /**
193   * Creates a sparse vector providing its index array and value array.
194   *
195   * @param size vector size.
196   * @param indices index array, must be strictly increasing.
197   * @param values value array, must have the same length as indices.
198   */
199  @Since("2.0.0")
200  def sparse(size: Int, indices: Array[Int], values: Array[Double]): Vector =
201    new SparseVector(size, indices, values)
202
203  /**
204   * Creates a sparse vector using unordered (index, value) pairs.
205   *
206   * @param size vector size.
207   * @param elements vector elements in (index, value) pairs.
208   */
209  @Since("2.0.0")
210  def sparse(size: Int, elements: Seq[(Int, Double)]): Vector = {
211    val (indices, values) = elements.sortBy(_._1).unzip
212    new SparseVector(size, indices.toArray, values.toArray)
213  }
214
215  /**
216   * Creates a sparse vector using unordered (index, value) pairs in a Java friendly way.
217   *
218   * @param size vector size.
219   * @param elements vector elements in (index, value) pairs.
220   */
221  @Since("2.0.0")
222  def sparse(size: Int, elements: JavaIterable[(JavaInteger, JavaDouble)]): Vector = {
223    sparse(size, elements.asScala.map { case (i, x) =>
224      (i.intValue(), x.doubleValue())
225    }.toSeq)
226  }
227
228  /**
229   * Creates a vector of all zeros.
230   *
231   * @param size vector size
232   * @return a zero vector
233   */
234  @Since("2.0.0")
235  def zeros(size: Int): Vector = {
236    new DenseVector(new Array[Double](size))
237  }
238
239  /**
240   * Creates a vector instance from a breeze vector.
241   */
242  private[spark] def fromBreeze(breezeVector: BV[Double]): Vector = {
243    breezeVector match {
244      case v: BDV[Double] =>
245        if (v.offset == 0 && v.stride == 1 && v.length == v.data.length) {
246          new DenseVector(v.data)
247        } else {
248          new DenseVector(v.toArray)  // Can't use underlying array directly, so make a new one
249        }
250      case v: BSV[Double] =>
251        if (v.index.length == v.used) {
252          new SparseVector(v.length, v.index, v.data)
253        } else {
254          new SparseVector(v.length, v.index.slice(0, v.used), v.data.slice(0, v.used))
255        }
256      case v: BV[_] =>
257        sys.error("Unsupported Breeze vector type: " + v.getClass.getName)
258    }
259  }
260
261  /**
262   * Returns the p-norm of this vector.
263   * @param vector input vector.
264   * @param p norm.
265   * @return norm in L^p^ space.
266   */
267  @Since("2.0.0")
268  def norm(vector: Vector, p: Double): Double = {
269    require(p >= 1.0, "To compute the p-norm of the vector, we require that you specify a p>=1. " +
270      s"You specified p=$p.")
271    val values = vector match {
272      case DenseVector(vs) => vs
273      case SparseVector(n, ids, vs) => vs
274      case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
275    }
276    val size = values.length
277
278    if (p == 1) {
279      var sum = 0.0
280      var i = 0
281      while (i < size) {
282        sum += math.abs(values(i))
283        i += 1
284      }
285      sum
286    } else if (p == 2) {
287      var sum = 0.0
288      var i = 0
289      while (i < size) {
290        sum += values(i) * values(i)
291        i += 1
292      }
293      math.sqrt(sum)
294    } else if (p == Double.PositiveInfinity) {
295      var max = 0.0
296      var i = 0
297      while (i < size) {
298        val value = math.abs(values(i))
299        if (value > max) max = value
300        i += 1
301      }
302      max
303    } else {
304      var sum = 0.0
305      var i = 0
306      while (i < size) {
307        sum += math.pow(math.abs(values(i)), p)
308        i += 1
309      }
310      math.pow(sum, 1.0 / p)
311    }
312  }
313
314  /**
315   * Returns the squared distance between two Vectors.
316   * @param v1 first Vector.
317   * @param v2 second Vector.
318   * @return squared distance between two Vectors.
319   */
320  @Since("2.0.0")
321  def sqdist(v1: Vector, v2: Vector): Double = {
322    require(v1.size == v2.size, s"Vector dimensions do not match: Dim(v1)=${v1.size} and Dim(v2)" +
323      s"=${v2.size}.")
324    var squaredDistance = 0.0
325    (v1, v2) match {
326      case (v1: SparseVector, v2: SparseVector) =>
327        val v1Values = v1.values
328        val v1Indices = v1.indices
329        val v2Values = v2.values
330        val v2Indices = v2.indices
331        val nnzv1 = v1Indices.length
332        val nnzv2 = v2Indices.length
333
334        var kv1 = 0
335        var kv2 = 0
336        while (kv1 < nnzv1 || kv2 < nnzv2) {
337          var score = 0.0
338
339          if (kv2 >= nnzv2 || (kv1 < nnzv1 && v1Indices(kv1) < v2Indices(kv2))) {
340            score = v1Values(kv1)
341            kv1 += 1
342          } else if (kv1 >= nnzv1 || (kv2 < nnzv2 && v2Indices(kv2) < v1Indices(kv1))) {
343            score = v2Values(kv2)
344            kv2 += 1
345          } else {
346            score = v1Values(kv1) - v2Values(kv2)
347            kv1 += 1
348            kv2 += 1
349          }
350          squaredDistance += score * score
351        }
352
353      case (v1: SparseVector, v2: DenseVector) =>
354        squaredDistance = sqdist(v1, v2)
355
356      case (v1: DenseVector, v2: SparseVector) =>
357        squaredDistance = sqdist(v2, v1)
358
359      case (DenseVector(vv1), DenseVector(vv2)) =>
360        var kv = 0
361        val sz = vv1.length
362        while (kv < sz) {
363          val score = vv1(kv) - vv2(kv)
364          squaredDistance += score * score
365          kv += 1
366        }
367      case _ =>
368        throw new IllegalArgumentException("Do not support vector type " + v1.getClass +
369          " and " + v2.getClass)
370    }
371    squaredDistance
372  }
373
374  /**
375   * Returns the squared distance between DenseVector and SparseVector.
376   */
377  private[ml] def sqdist(v1: SparseVector, v2: DenseVector): Double = {
378    var kv1 = 0
379    var kv2 = 0
380    val indices = v1.indices
381    var squaredDistance = 0.0
382    val nnzv1 = indices.length
383    val nnzv2 = v2.size
384    var iv1 = if (nnzv1 > 0) indices(kv1) else -1
385
386    while (kv2 < nnzv2) {
387      var score = 0.0
388      if (kv2 != iv1) {
389        score = v2(kv2)
390      } else {
391        score = v1.values(kv1) - v2(kv2)
392        if (kv1 < nnzv1 - 1) {
393          kv1 += 1
394          iv1 = indices(kv1)
395        }
396      }
397      squaredDistance += score * score
398      kv2 += 1
399    }
400    squaredDistance
401  }
402
403  /**
404   * Check equality between sparse/dense vectors
405   */
406  private[ml] def equals(
407      v1Indices: IndexedSeq[Int],
408      v1Values: Array[Double],
409      v2Indices: IndexedSeq[Int],
410      v2Values: Array[Double]): Boolean = {
411    val v1Size = v1Values.length
412    val v2Size = v2Values.length
413    var k1 = 0
414    var k2 = 0
415    var allEqual = true
416    while (allEqual) {
417      while (k1 < v1Size && v1Values(k1) == 0) k1 += 1
418      while (k2 < v2Size && v2Values(k2) == 0) k2 += 1
419
420      if (k1 >= v1Size || k2 >= v2Size) {
421        return k1 >= v1Size && k2 >= v2Size // check end alignment
422      }
423      allEqual = v1Indices(k1) == v2Indices(k2) && v1Values(k1) == v2Values(k2)
424      k1 += 1
425      k2 += 1
426    }
427    allEqual
428  }
429
430  /** Max number of nonzero entries used in computing hash code. */
431  private[linalg] val MAX_HASH_NNZ = 128
432}
433
434/**
435 * A dense vector represented by a value array.
436 */
437@Since("2.0.0")
438class DenseVector @Since("2.0.0") ( @Since("2.0.0") val values: Array[Double]) extends Vector {
439
440  override def size: Int = values.length
441
442  override def toString: String = values.mkString("[", ",", "]")
443
444  override def toArray: Array[Double] = values
445
446  private[spark] override def asBreeze: BV[Double] = new BDV[Double](values)
447
448  override def apply(i: Int): Double = values(i)
449
450  override def copy: DenseVector = {
451    new DenseVector(values.clone())
452  }
453
454  override def foreachActive(f: (Int, Double) => Unit): Unit = {
455    var i = 0
456    val localValuesSize = values.length
457    val localValues = values
458
459    while (i < localValuesSize) {
460      f(i, localValues(i))
461      i += 1
462    }
463  }
464
465  override def equals(other: Any): Boolean = super.equals(other)
466
467  override def hashCode(): Int = {
468    var result: Int = 31 + size
469    var i = 0
470    val end = values.length
471    var nnz = 0
472    while (i < end && nnz < Vectors.MAX_HASH_NNZ) {
473      val v = values(i)
474      if (v != 0.0) {
475        result = 31 * result + i
476        val bits = java.lang.Double.doubleToLongBits(values(i))
477        result = 31 * result + (bits ^ (bits >>> 32)).toInt
478        nnz += 1
479      }
480      i += 1
481    }
482    result
483  }
484
485  override def numActives: Int = size
486
487  override def numNonzeros: Int = {
488    // same as values.count(_ != 0.0) but faster
489    var nnz = 0
490    values.foreach { v =>
491      if (v != 0.0) {
492        nnz += 1
493      }
494    }
495    nnz
496  }
497
498  override def toSparse: SparseVector = {
499    val nnz = numNonzeros
500    val ii = new Array[Int](nnz)
501    val vv = new Array[Double](nnz)
502    var k = 0
503    foreachActive { (i, v) =>
504      if (v != 0) {
505        ii(k) = i
506        vv(k) = v
507        k += 1
508      }
509    }
510    new SparseVector(size, ii, vv)
511  }
512
513  override def argmax: Int = {
514    if (size == 0) {
515      -1
516    } else {
517      var maxIdx = 0
518      var maxValue = values(0)
519      var i = 1
520      while (i < size) {
521        if (values(i) > maxValue) {
522          maxIdx = i
523          maxValue = values(i)
524        }
525        i += 1
526      }
527      maxIdx
528    }
529  }
530}
531
532@Since("2.0.0")
533object DenseVector {
534
535  /** Extracts the value array from a dense vector. */
536  @Since("2.0.0")
537  def unapply(dv: DenseVector): Option[Array[Double]] = Some(dv.values)
538}
539
540/**
541 * A sparse vector represented by an index array and a value array.
542 *
543 * @param size size of the vector.
544 * @param indices index array, assume to be strictly increasing.
545 * @param values value array, must have the same length as the index array.
546 */
547@Since("2.0.0")
548class SparseVector @Since("2.0.0") (
549    override val size: Int,
550    @Since("2.0.0") val indices: Array[Int],
551    @Since("2.0.0") val values: Array[Double]) extends Vector {
552
553  // validate the data
554  {
555    require(size >= 0, "The size of the requested sparse vector must be greater than 0.")
556    require(indices.length == values.length, "Sparse vectors require that the dimension of the" +
557      s" indices match the dimension of the values. You provided ${indices.length} indices and " +
558      s" ${values.length} values.")
559    require(indices.length <= size, s"You provided ${indices.length} indices and values, " +
560      s"which exceeds the specified vector size ${size}.")
561
562    if (indices.nonEmpty) {
563      require(indices(0) >= 0, s"Found negative index: ${indices(0)}.")
564    }
565    var prev = -1
566    indices.foreach { i =>
567      require(prev < i, s"Index $i follows $prev and is not strictly increasing")
568      prev = i
569    }
570    require(prev < size, s"Index $prev out of bounds for vector of size $size")
571  }
572
573  override def toString: String =
574    s"($size,${indices.mkString("[", ",", "]")},${values.mkString("[", ",", "]")})"
575
576  override def toArray: Array[Double] = {
577    val data = new Array[Double](size)
578    var i = 0
579    val nnz = indices.length
580    while (i < nnz) {
581      data(indices(i)) = values(i)
582      i += 1
583    }
584    data
585  }
586
587  override def copy: SparseVector = {
588    new SparseVector(size, indices.clone(), values.clone())
589  }
590
591  private[spark] override def asBreeze: BV[Double] = new BSV[Double](indices, values, size)
592
593  override def foreachActive(f: (Int, Double) => Unit): Unit = {
594    var i = 0
595    val localValuesSize = values.length
596    val localIndices = indices
597    val localValues = values
598
599    while (i < localValuesSize) {
600      f(localIndices(i), localValues(i))
601      i += 1
602    }
603  }
604
605  override def equals(other: Any): Boolean = super.equals(other)
606
607  override def hashCode(): Int = {
608    var result: Int = 31 + size
609    val end = values.length
610    var k = 0
611    var nnz = 0
612    while (k < end && nnz < Vectors.MAX_HASH_NNZ) {
613      val v = values(k)
614      if (v != 0.0) {
615        val i = indices(k)
616        result = 31 * result + i
617        val bits = java.lang.Double.doubleToLongBits(v)
618        result = 31 * result + (bits ^ (bits >>> 32)).toInt
619        nnz += 1
620      }
621      k += 1
622    }
623    result
624  }
625
626  override def numActives: Int = values.length
627
628  override def numNonzeros: Int = {
629    var nnz = 0
630    values.foreach { v =>
631      if (v != 0.0) {
632        nnz += 1
633      }
634    }
635    nnz
636  }
637
638  override def toSparse: SparseVector = {
639    val nnz = numNonzeros
640    if (nnz == numActives) {
641      this
642    } else {
643      val ii = new Array[Int](nnz)
644      val vv = new Array[Double](nnz)
645      var k = 0
646      foreachActive { (i, v) =>
647        if (v != 0.0) {
648          ii(k) = i
649          vv(k) = v
650          k += 1
651        }
652      }
653      new SparseVector(size, ii, vv)
654    }
655  }
656
657  override def argmax: Int = {
658    if (size == 0) {
659      -1
660    } else {
661      // Find the max active entry.
662      var maxIdx = indices(0)
663      var maxValue = values(0)
664      var maxJ = 0
665      var j = 1
666      val na = numActives
667      while (j < na) {
668        val v = values(j)
669        if (v > maxValue) {
670          maxValue = v
671          maxIdx = indices(j)
672          maxJ = j
673        }
674        j += 1
675      }
676
677      // If the max active entry is nonpositive and there exists inactive ones, find the first zero.
678      if (maxValue <= 0.0 && na < size) {
679        if (maxValue == 0.0) {
680          // If there exists an inactive entry before maxIdx, find it and return its index.
681          if (maxJ < maxIdx) {
682            var k = 0
683            while (k < maxJ && indices(k) == k) {
684              k += 1
685            }
686            maxIdx = k
687          }
688        } else {
689          // If the max active value is negative, find and return the first inactive index.
690          var k = 0
691          while (k < na && indices(k) == k) {
692            k += 1
693          }
694          maxIdx = k
695        }
696      }
697
698      maxIdx
699    }
700  }
701
702  /**
703   * Create a slice of this vector based on the given indices.
704   * @param selectedIndices Unsorted list of indices into the vector.
705   *                        This does NOT do bound checking.
706   * @return  New SparseVector with values in the order specified by the given indices.
707   *
708   * NOTE: The API needs to be discussed before making this public.
709   *       Also, if we have a version assuming indices are sorted, we should optimize it.
710   */
711  private[spark] def slice(selectedIndices: Array[Int]): SparseVector = {
712    var currentIdx = 0
713    val (sliceInds, sliceVals) = selectedIndices.flatMap { origIdx =>
714      val iIdx = java.util.Arrays.binarySearch(this.indices, origIdx)
715      val i_v = if (iIdx >= 0) {
716        Iterator((currentIdx, this.values(iIdx)))
717      } else {
718        Iterator()
719      }
720      currentIdx += 1
721      i_v
722    }.unzip
723    new SparseVector(selectedIndices.length, sliceInds.toArray, sliceVals.toArray)
724  }
725}
726
727@Since("2.0.0")
728object SparseVector {
729  @Since("2.0.0")
730  def unapply(sv: SparseVector): Option[(Int, Array[Int], Array[Double])] =
731    Some((sv.size, sv.indices, sv.values))
732}
733