1#
2# Licensed to the Apache Software Foundation (ASF) under one or more
3# contributor license agreements.  See the NOTICE file distributed with
4# this work for additional information regarding copyright ownership.
5# The ASF licenses this file to You under the Apache License, Version 2.0
6# (the "License"); you may not use this file except in compliance with
7# the License.  You may obtain a copy of the License at
8#
9#    http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18from __future__ import absolute_import
19
20import random
21
22from pyspark import SparkContext, RDD, since
23from pyspark.mllib.common import callMLlibFunc, inherit_doc, JavaModelWrapper
24from pyspark.mllib.linalg import _convert_to_vector
25from pyspark.mllib.regression import LabeledPoint
26from pyspark.mllib.util import JavaLoader, JavaSaveable
27
28__all__ = ['DecisionTreeModel', 'DecisionTree', 'RandomForestModel',
29           'RandomForest', 'GradientBoostedTreesModel', 'GradientBoostedTrees']
30
31
32class TreeEnsembleModel(JavaModelWrapper, JavaSaveable):
33    """TreeEnsembleModel
34
35    .. versionadded:: 1.3.0
36    """
37    @since("1.3.0")
38    def predict(self, x):
39        """
40        Predict values for a single data point or an RDD of points using
41        the model trained.
42
43        .. note:: In Python, predict cannot currently be used within an RDD
44            transformation or action.
45            Call predict directly on the RDD instead.
46        """
47        if isinstance(x, RDD):
48            return self.call("predict", x.map(_convert_to_vector))
49
50        else:
51            return self.call("predict", _convert_to_vector(x))
52
53    @since("1.3.0")
54    def numTrees(self):
55        """
56        Get number of trees in ensemble.
57        """
58        return self.call("numTrees")
59
60    @since("1.3.0")
61    def totalNumNodes(self):
62        """
63        Get total number of nodes, summed over all trees in the ensemble.
64        """
65        return self.call("totalNumNodes")
66
67    def __repr__(self):
68        """ Summary of model """
69        return self._java_model.toString()
70
71    @since("1.3.0")
72    def toDebugString(self):
73        """ Full model """
74        return self._java_model.toDebugString()
75
76
77class DecisionTreeModel(JavaModelWrapper, JavaSaveable, JavaLoader):
78    """
79    A decision tree model for classification or regression.
80
81    .. versionadded:: 1.1.0
82    """
83    @since("1.1.0")
84    def predict(self, x):
85        """
86        Predict the label of one or more examples.
87
88        .. note:: In Python, predict cannot currently be used within an RDD
89            transformation or action.
90            Call predict directly on the RDD instead.
91
92        :param x:
93          Data point (feature vector), or an RDD of data points (feature
94          vectors).
95        """
96        if isinstance(x, RDD):
97            return self.call("predict", x.map(_convert_to_vector))
98
99        else:
100            return self.call("predict", _convert_to_vector(x))
101
102    @since("1.1.0")
103    def numNodes(self):
104        """Get number of nodes in tree, including leaf nodes."""
105        return self._java_model.numNodes()
106
107    @since("1.1.0")
108    def depth(self):
109        """
110        Get depth of tree (e.g. depth 0 means 1 leaf node, depth 1
111        means 1 internal node + 2 leaf nodes).
112        """
113        return self._java_model.depth()
114
115    def __repr__(self):
116        """ summary of model. """
117        return self._java_model.toString()
118
119    @since("1.2.0")
120    def toDebugString(self):
121        """ full model. """
122        return self._java_model.toDebugString()
123
124    @classmethod
125    def _java_loader_class(cls):
126        return "org.apache.spark.mllib.tree.model.DecisionTreeModel"
127
128
129class DecisionTree(object):
130    """
131    Learning algorithm for a decision tree model for classification or
132    regression.
133
134    .. versionadded:: 1.1.0
135    """
136
137    @classmethod
138    def _train(cls, data, type, numClasses, features, impurity="gini", maxDepth=5, maxBins=32,
139               minInstancesPerNode=1, minInfoGain=0.0):
140        first = data.first()
141        assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
142        model = callMLlibFunc("trainDecisionTreeModel", data, type, numClasses, features,
143                              impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
144        return DecisionTreeModel(model)
145
146    @classmethod
147    @since("1.1.0")
148    def trainClassifier(cls, data, numClasses, categoricalFeaturesInfo,
149                        impurity="gini", maxDepth=5, maxBins=32, minInstancesPerNode=1,
150                        minInfoGain=0.0):
151        """
152        Train a decision tree model for classification.
153
154        :param data:
155          Training data: RDD of LabeledPoint. Labels should take values
156          {0, 1, ..., numClasses-1}.
157        :param numClasses:
158          Number of classes for classification.
159        :param categoricalFeaturesInfo:
160          Map storing arity of categorical features. An entry (n -> k)
161          indicates that feature n is categorical with k categories
162          indexed from 0: {0, 1, ..., k-1}.
163        :param impurity:
164          Criterion used for information gain calculation.
165          Supported values: "gini" or "entropy".
166          (default: "gini")
167        :param maxDepth:
168          Maximum depth of tree (e.g. depth 0 means 1 leaf node, depth 1
169          means 1 internal node + 2 leaf nodes).
170          (default: 5)
171        :param maxBins:
172          Number of bins used for finding splits at each node.
173          (default: 32)
174        :param minInstancesPerNode:
175          Minimum number of instances required at child nodes to create
176          the parent split.
177          (default: 1)
178        :param minInfoGain:
179          Minimum info gain required to create a split.
180          (default: 0.0)
181        :return:
182          DecisionTreeModel.
183
184        Example usage:
185
186        >>> from numpy import array
187        >>> from pyspark.mllib.regression import LabeledPoint
188        >>> from pyspark.mllib.tree import DecisionTree
189        >>>
190        >>> data = [
191        ...     LabeledPoint(0.0, [0.0]),
192        ...     LabeledPoint(1.0, [1.0]),
193        ...     LabeledPoint(1.0, [2.0]),
194        ...     LabeledPoint(1.0, [3.0])
195        ... ]
196        >>> model = DecisionTree.trainClassifier(sc.parallelize(data), 2, {})
197        >>> print(model)
198        DecisionTreeModel classifier of depth 1 with 3 nodes
199
200        >>> print(model.toDebugString())
201        DecisionTreeModel classifier of depth 1 with 3 nodes
202          If (feature 0 <= 0.0)
203           Predict: 0.0
204          Else (feature 0 > 0.0)
205           Predict: 1.0
206        <BLANKLINE>
207        >>> model.predict(array([1.0]))
208        1.0
209        >>> model.predict(array([0.0]))
210        0.0
211        >>> rdd = sc.parallelize([[1.0], [0.0]])
212        >>> model.predict(rdd).collect()
213        [1.0, 0.0]
214        """
215        return cls._train(data, "classification", numClasses, categoricalFeaturesInfo,
216                          impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
217
218    @classmethod
219    @since("1.1.0")
220    def trainRegressor(cls, data, categoricalFeaturesInfo,
221                       impurity="variance", maxDepth=5, maxBins=32, minInstancesPerNode=1,
222                       minInfoGain=0.0):
223        """
224        Train a decision tree model for regression.
225
226        :param data:
227          Training data: RDD of LabeledPoint. Labels are real numbers.
228        :param categoricalFeaturesInfo:
229          Map storing arity of categorical features. An entry (n -> k)
230          indicates that feature n is categorical with k categories
231          indexed from 0: {0, 1, ..., k-1}.
232        :param impurity:
233          Criterion used for information gain calculation.
234          The only supported value for regression is "variance".
235          (default: "variance")
236        :param maxDepth:
237          Maximum depth of tree (e.g. depth 0 means 1 leaf node, depth 1
238          means 1 internal node + 2 leaf nodes).
239          (default: 5)
240        :param maxBins:
241          Number of bins used for finding splits at each node.
242          (default: 32)
243        :param minInstancesPerNode:
244          Minimum number of instances required at child nodes to create
245          the parent split.
246          (default: 1)
247        :param minInfoGain:
248          Minimum info gain required to create a split.
249          (default: 0.0)
250        :return:
251          DecisionTreeModel.
252
253        Example usage:
254
255        >>> from pyspark.mllib.regression import LabeledPoint
256        >>> from pyspark.mllib.tree import DecisionTree
257        >>> from pyspark.mllib.linalg import SparseVector
258        >>>
259        >>> sparse_data = [
260        ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
261        ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
262        ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
263        ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
264        ... ]
265        >>>
266        >>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data), {})
267        >>> model.predict(SparseVector(2, {1: 1.0}))
268        1.0
269        >>> model.predict(SparseVector(2, {1: 0.0}))
270        0.0
271        >>> rdd = sc.parallelize([[0.0, 1.0], [0.0, 0.0]])
272        >>> model.predict(rdd).collect()
273        [1.0, 0.0]
274        """
275        return cls._train(data, "regression", 0, categoricalFeaturesInfo,
276                          impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
277
278
279@inherit_doc
280class RandomForestModel(TreeEnsembleModel, JavaLoader):
281    """
282    Represents a random forest model.
283
284    .. versionadded:: 1.2.0
285    """
286
287    @classmethod
288    def _java_loader_class(cls):
289        return "org.apache.spark.mllib.tree.model.RandomForestModel"
290
291
292class RandomForest(object):
293    """
294    Learning algorithm for a random forest model for classification or
295    regression.
296
297    .. versionadded:: 1.2.0
298    """
299
300    supportedFeatureSubsetStrategies = ("auto", "all", "sqrt", "log2", "onethird")
301
302    @classmethod
303    def _train(cls, data, algo, numClasses, categoricalFeaturesInfo, numTrees,
304               featureSubsetStrategy, impurity, maxDepth, maxBins, seed):
305        first = data.first()
306        assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
307        if featureSubsetStrategy not in cls.supportedFeatureSubsetStrategies:
308            raise ValueError("unsupported featureSubsetStrategy: %s" % featureSubsetStrategy)
309        if seed is None:
310            seed = random.randint(0, 1 << 30)
311        model = callMLlibFunc("trainRandomForestModel", data, algo, numClasses,
312                              categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity,
313                              maxDepth, maxBins, seed)
314        return RandomForestModel(model)
315
316    @classmethod
317    @since("1.2.0")
318    def trainClassifier(cls, data, numClasses, categoricalFeaturesInfo, numTrees,
319                        featureSubsetStrategy="auto", impurity="gini", maxDepth=4, maxBins=32,
320                        seed=None):
321        """
322        Train a random forest model for binary or multiclass
323        classification.
324
325        :param data:
326          Training dataset: RDD of LabeledPoint. Labels should take values
327          {0, 1, ..., numClasses-1}.
328        :param numClasses:
329          Number of classes for classification.
330        :param categoricalFeaturesInfo:
331          Map storing arity of categorical features. An entry (n -> k)
332          indicates that feature n is categorical with k categories
333          indexed from 0: {0, 1, ..., k-1}.
334        :param numTrees:
335          Number of trees in the random forest.
336        :param featureSubsetStrategy:
337          Number of features to consider for splits at each node.
338          Supported values: "auto", "all", "sqrt", "log2", "onethird".
339          If "auto" is set, this parameter is set based on numTrees:
340          if numTrees == 1, set to "all";
341          if numTrees > 1 (forest) set to "sqrt".
342          (default: "auto")
343        :param impurity:
344          Criterion used for information gain calculation.
345          Supported values: "gini" or "entropy".
346          (default: "gini")
347        :param maxDepth:
348          Maximum depth of tree (e.g. depth 0 means 1 leaf node, depth 1
349          means 1 internal node + 2 leaf nodes).
350          (default: 4)
351        :param maxBins:
352          Maximum number of bins used for splitting features.
353          (default: 32)
354        :param seed:
355          Random seed for bootstrapping and choosing feature subsets.
356          Set as None to generate seed based on system time.
357          (default: None)
358        :return:
359          RandomForestModel that can be used for prediction.
360
361        Example usage:
362
363        >>> from pyspark.mllib.regression import LabeledPoint
364        >>> from pyspark.mllib.tree import RandomForest
365        >>>
366        >>> data = [
367        ...     LabeledPoint(0.0, [0.0]),
368        ...     LabeledPoint(0.0, [1.0]),
369        ...     LabeledPoint(1.0, [2.0]),
370        ...     LabeledPoint(1.0, [3.0])
371        ... ]
372        >>> model = RandomForest.trainClassifier(sc.parallelize(data), 2, {}, 3, seed=42)
373        >>> model.numTrees()
374        3
375        >>> model.totalNumNodes()
376        7
377        >>> print(model)
378        TreeEnsembleModel classifier with 3 trees
379        <BLANKLINE>
380        >>> print(model.toDebugString())
381        TreeEnsembleModel classifier with 3 trees
382        <BLANKLINE>
383          Tree 0:
384            Predict: 1.0
385          Tree 1:
386            If (feature 0 <= 1.0)
387             Predict: 0.0
388            Else (feature 0 > 1.0)
389             Predict: 1.0
390          Tree 2:
391            If (feature 0 <= 1.0)
392             Predict: 0.0
393            Else (feature 0 > 1.0)
394             Predict: 1.0
395        <BLANKLINE>
396        >>> model.predict([2.0])
397        1.0
398        >>> model.predict([0.0])
399        0.0
400        >>> rdd = sc.parallelize([[3.0], [1.0]])
401        >>> model.predict(rdd).collect()
402        [1.0, 0.0]
403        """
404        return cls._train(data, "classification", numClasses,
405                          categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity,
406                          maxDepth, maxBins, seed)
407
408    @classmethod
409    @since("1.2.0")
410    def trainRegressor(cls, data, categoricalFeaturesInfo, numTrees, featureSubsetStrategy="auto",
411                       impurity="variance", maxDepth=4, maxBins=32, seed=None):
412        """
413        Train a random forest model for regression.
414
415        :param data:
416          Training dataset: RDD of LabeledPoint. Labels are real numbers.
417        :param categoricalFeaturesInfo:
418          Map storing arity of categorical features. An entry (n -> k)
419          indicates that feature n is categorical with k categories
420          indexed from 0: {0, 1, ..., k-1}.
421        :param numTrees:
422          Number of trees in the random forest.
423        :param featureSubsetStrategy:
424          Number of features to consider for splits at each node.
425          Supported values: "auto", "all", "sqrt", "log2", "onethird".
426          If "auto" is set, this parameter is set based on numTrees:
427          if numTrees == 1, set to "all";
428          if numTrees > 1 (forest) set to "onethird" for regression.
429          (default: "auto")
430        :param impurity:
431          Criterion used for information gain calculation.
432          The only supported value for regression is "variance".
433          (default: "variance")
434        :param maxDepth:
435          Maximum depth of tree (e.g. depth 0 means 1 leaf node, depth 1
436          means 1 internal node + 2 leaf nodes).
437          (default: 4)
438        :param maxBins:
439          Maximum number of bins used for splitting features.
440          (default: 32)
441        :param seed:
442          Random seed for bootstrapping and choosing feature subsets.
443          Set as None to generate seed based on system time.
444          (default: None)
445        :return:
446          RandomForestModel that can be used for prediction.
447
448        Example usage:
449
450        >>> from pyspark.mllib.regression import LabeledPoint
451        >>> from pyspark.mllib.tree import RandomForest
452        >>> from pyspark.mllib.linalg import SparseVector
453        >>>
454        >>> sparse_data = [
455        ...     LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
456        ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
457        ...     LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
458        ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
459        ... ]
460        >>>
461        >>> model = RandomForest.trainRegressor(sc.parallelize(sparse_data), {}, 2, seed=42)
462        >>> model.numTrees()
463        2
464        >>> model.totalNumNodes()
465        4
466        >>> model.predict(SparseVector(2, {1: 1.0}))
467        1.0
468        >>> model.predict(SparseVector(2, {0: 1.0}))
469        0.5
470        >>> rdd = sc.parallelize([[0.0, 1.0], [1.0, 0.0]])
471        >>> model.predict(rdd).collect()
472        [1.0, 0.5]
473        """
474        return cls._train(data, "regression", 0, categoricalFeaturesInfo, numTrees,
475                          featureSubsetStrategy, impurity, maxDepth, maxBins, seed)
476
477
478@inherit_doc
479class GradientBoostedTreesModel(TreeEnsembleModel, JavaLoader):
480    """
481    Represents a gradient-boosted tree model.
482
483    .. versionadded:: 1.3.0
484    """
485
486    @classmethod
487    def _java_loader_class(cls):
488        return "org.apache.spark.mllib.tree.model.GradientBoostedTreesModel"
489
490
491class GradientBoostedTrees(object):
492    """
493    Learning algorithm for a gradient boosted trees model for
494    classification or regression.
495
496    .. versionadded:: 1.3.0
497    """
498
499    @classmethod
500    def _train(cls, data, algo, categoricalFeaturesInfo,
501               loss, numIterations, learningRate, maxDepth, maxBins):
502        first = data.first()
503        assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
504        model = callMLlibFunc("trainGradientBoostedTreesModel", data, algo, categoricalFeaturesInfo,
505                              loss, numIterations, learningRate, maxDepth, maxBins)
506        return GradientBoostedTreesModel(model)
507
508    @classmethod
509    @since("1.3.0")
510    def trainClassifier(cls, data, categoricalFeaturesInfo,
511                        loss="logLoss", numIterations=100, learningRate=0.1, maxDepth=3,
512                        maxBins=32):
513        """
514        Train a gradient-boosted trees model for classification.
515
516        :param data:
517          Training dataset: RDD of LabeledPoint. Labels should take values
518          {0, 1}.
519        :param categoricalFeaturesInfo:
520          Map storing arity of categorical features. An entry (n -> k)
521          indicates that feature n is categorical with k categories
522          indexed from 0: {0, 1, ..., k-1}.
523        :param loss:
524          Loss function used for minimization during gradient boosting.
525          Supported values: "logLoss", "leastSquaresError",
526          "leastAbsoluteError".
527          (default: "logLoss")
528        :param numIterations:
529          Number of iterations of boosting.
530          (default: 100)
531        :param learningRate:
532          Learning rate for shrinking the contribution of each estimator.
533          The learning rate should be between in the interval (0, 1].
534          (default: 0.1)
535        :param maxDepth:
536          Maximum depth of tree (e.g. depth 0 means 1 leaf node, depth 1
537          means 1 internal node + 2 leaf nodes).
538          (default: 3)
539        :param maxBins:
540          Maximum number of bins used for splitting features. DecisionTree
541          requires maxBins >= max categories.
542          (default: 32)
543        :return:
544          GradientBoostedTreesModel that can be used for prediction.
545
546        Example usage:
547
548        >>> from pyspark.mllib.regression import LabeledPoint
549        >>> from pyspark.mllib.tree import GradientBoostedTrees
550        >>>
551        >>> data = [
552        ...     LabeledPoint(0.0, [0.0]),
553        ...     LabeledPoint(0.0, [1.0]),
554        ...     LabeledPoint(1.0, [2.0]),
555        ...     LabeledPoint(1.0, [3.0])
556        ... ]
557        >>>
558        >>> model = GradientBoostedTrees.trainClassifier(sc.parallelize(data), {}, numIterations=10)
559        >>> model.numTrees()
560        10
561        >>> model.totalNumNodes()
562        30
563        >>> print(model)  # it already has newline
564        TreeEnsembleModel classifier with 10 trees
565        <BLANKLINE>
566        >>> model.predict([2.0])
567        1.0
568        >>> model.predict([0.0])
569        0.0
570        >>> rdd = sc.parallelize([[2.0], [0.0]])
571        >>> model.predict(rdd).collect()
572        [1.0, 0.0]
573        """
574        return cls._train(data, "classification", categoricalFeaturesInfo,
575                          loss, numIterations, learningRate, maxDepth, maxBins)
576
577    @classmethod
578    @since("1.3.0")
579    def trainRegressor(cls, data, categoricalFeaturesInfo,
580                       loss="leastSquaresError", numIterations=100, learningRate=0.1, maxDepth=3,
581                       maxBins=32):
582        """
583        Train a gradient-boosted trees model for regression.
584
585        :param data:
586          Training dataset: RDD of LabeledPoint. Labels are real numbers.
587        :param categoricalFeaturesInfo:
588          Map storing arity of categorical features. An entry (n -> k)
589          indicates that feature n is categorical with k categories
590          indexed from 0: {0, 1, ..., k-1}.
591        :param loss:
592          Loss function used for minimization during gradient boosting.
593          Supported values: "logLoss", "leastSquaresError",
594          "leastAbsoluteError".
595          (default: "leastSquaresError")
596        :param numIterations:
597          Number of iterations of boosting.
598          (default: 100)
599        :param learningRate:
600          Learning rate for shrinking the contribution of each estimator.
601          The learning rate should be between in the interval (0, 1].
602          (default: 0.1)
603        :param maxDepth:
604          Maximum depth of tree (e.g. depth 0 means 1 leaf node, depth 1
605          means 1 internal node + 2 leaf nodes).
606          (default: 3)
607        :param maxBins:
608          Maximum number of bins used for splitting features. DecisionTree
609          requires maxBins >= max categories.
610          (default: 32)
611        :return:
612          GradientBoostedTreesModel that can be used for prediction.
613
614        Example usage:
615
616        >>> from pyspark.mllib.regression import LabeledPoint
617        >>> from pyspark.mllib.tree import GradientBoostedTrees
618        >>> from pyspark.mllib.linalg import SparseVector
619        >>>
620        >>> sparse_data = [
621        ...     LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
622        ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
623        ...     LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
624        ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
625        ... ]
626        >>>
627        >>> data = sc.parallelize(sparse_data)
628        >>> model = GradientBoostedTrees.trainRegressor(data, {}, numIterations=10)
629        >>> model.numTrees()
630        10
631        >>> model.totalNumNodes()
632        12
633        >>> model.predict(SparseVector(2, {1: 1.0}))
634        1.0
635        >>> model.predict(SparseVector(2, {0: 1.0}))
636        0.0
637        >>> rdd = sc.parallelize([[0.0, 1.0], [1.0, 0.0]])
638        >>> model.predict(rdd).collect()
639        [1.0, 0.0]
640        """
641        return cls._train(data, "regression", categoricalFeaturesInfo,
642                          loss, numIterations, learningRate, maxDepth, maxBins)
643
644
645def _test():
646    import doctest
647    globs = globals().copy()
648    from pyspark.sql import SparkSession
649    spark = SparkSession.builder\
650        .master("local[4]")\
651        .appName("mllib.tree tests")\
652        .getOrCreate()
653    globs['sc'] = spark.sparkContext
654    (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
655    spark.stop()
656    if failure_count:
657        exit(-1)
658
659if __name__ == "__main__":
660    _test()
661