1# 2# Licensed to the Apache Software Foundation (ASF) under one or more 3# contributor license agreements. See the NOTICE file distributed with 4# this work for additional information regarding copyright ownership. 5# The ASF licenses this file to You under the Apache License, Version 2.0 6# (the "License"); you may not use this file except in compliance with 7# the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18from __future__ import absolute_import 19 20import random 21 22from pyspark import SparkContext, RDD, since 23from pyspark.mllib.common import callMLlibFunc, inherit_doc, JavaModelWrapper 24from pyspark.mllib.linalg import _convert_to_vector 25from pyspark.mllib.regression import LabeledPoint 26from pyspark.mllib.util import JavaLoader, JavaSaveable 27 28__all__ = ['DecisionTreeModel', 'DecisionTree', 'RandomForestModel', 29 'RandomForest', 'GradientBoostedTreesModel', 'GradientBoostedTrees'] 30 31 32class TreeEnsembleModel(JavaModelWrapper, JavaSaveable): 33 """TreeEnsembleModel 34 35 .. versionadded:: 1.3.0 36 """ 37 @since("1.3.0") 38 def predict(self, x): 39 """ 40 Predict values for a single data point or an RDD of points using 41 the model trained. 42 43 .. note:: In Python, predict cannot currently be used within an RDD 44 transformation or action. 45 Call predict directly on the RDD instead. 46 """ 47 if isinstance(x, RDD): 48 return self.call("predict", x.map(_convert_to_vector)) 49 50 else: 51 return self.call("predict", _convert_to_vector(x)) 52 53 @since("1.3.0") 54 def numTrees(self): 55 """ 56 Get number of trees in ensemble. 57 """ 58 return self.call("numTrees") 59 60 @since("1.3.0") 61 def totalNumNodes(self): 62 """ 63 Get total number of nodes, summed over all trees in the ensemble. 64 """ 65 return self.call("totalNumNodes") 66 67 def __repr__(self): 68 """ Summary of model """ 69 return self._java_model.toString() 70 71 @since("1.3.0") 72 def toDebugString(self): 73 """ Full model """ 74 return self._java_model.toDebugString() 75 76 77class DecisionTreeModel(JavaModelWrapper, JavaSaveable, JavaLoader): 78 """ 79 A decision tree model for classification or regression. 80 81 .. versionadded:: 1.1.0 82 """ 83 @since("1.1.0") 84 def predict(self, x): 85 """ 86 Predict the label of one or more examples. 87 88 .. note:: In Python, predict cannot currently be used within an RDD 89 transformation or action. 90 Call predict directly on the RDD instead. 91 92 :param x: 93 Data point (feature vector), or an RDD of data points (feature 94 vectors). 95 """ 96 if isinstance(x, RDD): 97 return self.call("predict", x.map(_convert_to_vector)) 98 99 else: 100 return self.call("predict", _convert_to_vector(x)) 101 102 @since("1.1.0") 103 def numNodes(self): 104 """Get number of nodes in tree, including leaf nodes.""" 105 return self._java_model.numNodes() 106 107 @since("1.1.0") 108 def depth(self): 109 """ 110 Get depth of tree (e.g. depth 0 means 1 leaf node, depth 1 111 means 1 internal node + 2 leaf nodes). 112 """ 113 return self._java_model.depth() 114 115 def __repr__(self): 116 """ summary of model. """ 117 return self._java_model.toString() 118 119 @since("1.2.0") 120 def toDebugString(self): 121 """ full model. """ 122 return self._java_model.toDebugString() 123 124 @classmethod 125 def _java_loader_class(cls): 126 return "org.apache.spark.mllib.tree.model.DecisionTreeModel" 127 128 129class DecisionTree(object): 130 """ 131 Learning algorithm for a decision tree model for classification or 132 regression. 133 134 .. versionadded:: 1.1.0 135 """ 136 137 @classmethod 138 def _train(cls, data, type, numClasses, features, impurity="gini", maxDepth=5, maxBins=32, 139 minInstancesPerNode=1, minInfoGain=0.0): 140 first = data.first() 141 assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint" 142 model = callMLlibFunc("trainDecisionTreeModel", data, type, numClasses, features, 143 impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain) 144 return DecisionTreeModel(model) 145 146 @classmethod 147 @since("1.1.0") 148 def trainClassifier(cls, data, numClasses, categoricalFeaturesInfo, 149 impurity="gini", maxDepth=5, maxBins=32, minInstancesPerNode=1, 150 minInfoGain=0.0): 151 """ 152 Train a decision tree model for classification. 153 154 :param data: 155 Training data: RDD of LabeledPoint. Labels should take values 156 {0, 1, ..., numClasses-1}. 157 :param numClasses: 158 Number of classes for classification. 159 :param categoricalFeaturesInfo: 160 Map storing arity of categorical features. An entry (n -> k) 161 indicates that feature n is categorical with k categories 162 indexed from 0: {0, 1, ..., k-1}. 163 :param impurity: 164 Criterion used for information gain calculation. 165 Supported values: "gini" or "entropy". 166 (default: "gini") 167 :param maxDepth: 168 Maximum depth of tree (e.g. depth 0 means 1 leaf node, depth 1 169 means 1 internal node + 2 leaf nodes). 170 (default: 5) 171 :param maxBins: 172 Number of bins used for finding splits at each node. 173 (default: 32) 174 :param minInstancesPerNode: 175 Minimum number of instances required at child nodes to create 176 the parent split. 177 (default: 1) 178 :param minInfoGain: 179 Minimum info gain required to create a split. 180 (default: 0.0) 181 :return: 182 DecisionTreeModel. 183 184 Example usage: 185 186 >>> from numpy import array 187 >>> from pyspark.mllib.regression import LabeledPoint 188 >>> from pyspark.mllib.tree import DecisionTree 189 >>> 190 >>> data = [ 191 ... LabeledPoint(0.0, [0.0]), 192 ... LabeledPoint(1.0, [1.0]), 193 ... LabeledPoint(1.0, [2.0]), 194 ... LabeledPoint(1.0, [3.0]) 195 ... ] 196 >>> model = DecisionTree.trainClassifier(sc.parallelize(data), 2, {}) 197 >>> print(model) 198 DecisionTreeModel classifier of depth 1 with 3 nodes 199 200 >>> print(model.toDebugString()) 201 DecisionTreeModel classifier of depth 1 with 3 nodes 202 If (feature 0 <= 0.0) 203 Predict: 0.0 204 Else (feature 0 > 0.0) 205 Predict: 1.0 206 <BLANKLINE> 207 >>> model.predict(array([1.0])) 208 1.0 209 >>> model.predict(array([0.0])) 210 0.0 211 >>> rdd = sc.parallelize([[1.0], [0.0]]) 212 >>> model.predict(rdd).collect() 213 [1.0, 0.0] 214 """ 215 return cls._train(data, "classification", numClasses, categoricalFeaturesInfo, 216 impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain) 217 218 @classmethod 219 @since("1.1.0") 220 def trainRegressor(cls, data, categoricalFeaturesInfo, 221 impurity="variance", maxDepth=5, maxBins=32, minInstancesPerNode=1, 222 minInfoGain=0.0): 223 """ 224 Train a decision tree model for regression. 225 226 :param data: 227 Training data: RDD of LabeledPoint. Labels are real numbers. 228 :param categoricalFeaturesInfo: 229 Map storing arity of categorical features. An entry (n -> k) 230 indicates that feature n is categorical with k categories 231 indexed from 0: {0, 1, ..., k-1}. 232 :param impurity: 233 Criterion used for information gain calculation. 234 The only supported value for regression is "variance". 235 (default: "variance") 236 :param maxDepth: 237 Maximum depth of tree (e.g. depth 0 means 1 leaf node, depth 1 238 means 1 internal node + 2 leaf nodes). 239 (default: 5) 240 :param maxBins: 241 Number of bins used for finding splits at each node. 242 (default: 32) 243 :param minInstancesPerNode: 244 Minimum number of instances required at child nodes to create 245 the parent split. 246 (default: 1) 247 :param minInfoGain: 248 Minimum info gain required to create a split. 249 (default: 0.0) 250 :return: 251 DecisionTreeModel. 252 253 Example usage: 254 255 >>> from pyspark.mllib.regression import LabeledPoint 256 >>> from pyspark.mllib.tree import DecisionTree 257 >>> from pyspark.mllib.linalg import SparseVector 258 >>> 259 >>> sparse_data = [ 260 ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})), 261 ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})), 262 ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})), 263 ... LabeledPoint(1.0, SparseVector(2, {1: 2.0})) 264 ... ] 265 >>> 266 >>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data), {}) 267 >>> model.predict(SparseVector(2, {1: 1.0})) 268 1.0 269 >>> model.predict(SparseVector(2, {1: 0.0})) 270 0.0 271 >>> rdd = sc.parallelize([[0.0, 1.0], [0.0, 0.0]]) 272 >>> model.predict(rdd).collect() 273 [1.0, 0.0] 274 """ 275 return cls._train(data, "regression", 0, categoricalFeaturesInfo, 276 impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain) 277 278 279@inherit_doc 280class RandomForestModel(TreeEnsembleModel, JavaLoader): 281 """ 282 Represents a random forest model. 283 284 .. versionadded:: 1.2.0 285 """ 286 287 @classmethod 288 def _java_loader_class(cls): 289 return "org.apache.spark.mllib.tree.model.RandomForestModel" 290 291 292class RandomForest(object): 293 """ 294 Learning algorithm for a random forest model for classification or 295 regression. 296 297 .. versionadded:: 1.2.0 298 """ 299 300 supportedFeatureSubsetStrategies = ("auto", "all", "sqrt", "log2", "onethird") 301 302 @classmethod 303 def _train(cls, data, algo, numClasses, categoricalFeaturesInfo, numTrees, 304 featureSubsetStrategy, impurity, maxDepth, maxBins, seed): 305 first = data.first() 306 assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint" 307 if featureSubsetStrategy not in cls.supportedFeatureSubsetStrategies: 308 raise ValueError("unsupported featureSubsetStrategy: %s" % featureSubsetStrategy) 309 if seed is None: 310 seed = random.randint(0, 1 << 30) 311 model = callMLlibFunc("trainRandomForestModel", data, algo, numClasses, 312 categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, 313 maxDepth, maxBins, seed) 314 return RandomForestModel(model) 315 316 @classmethod 317 @since("1.2.0") 318 def trainClassifier(cls, data, numClasses, categoricalFeaturesInfo, numTrees, 319 featureSubsetStrategy="auto", impurity="gini", maxDepth=4, maxBins=32, 320 seed=None): 321 """ 322 Train a random forest model for binary or multiclass 323 classification. 324 325 :param data: 326 Training dataset: RDD of LabeledPoint. Labels should take values 327 {0, 1, ..., numClasses-1}. 328 :param numClasses: 329 Number of classes for classification. 330 :param categoricalFeaturesInfo: 331 Map storing arity of categorical features. An entry (n -> k) 332 indicates that feature n is categorical with k categories 333 indexed from 0: {0, 1, ..., k-1}. 334 :param numTrees: 335 Number of trees in the random forest. 336 :param featureSubsetStrategy: 337 Number of features to consider for splits at each node. 338 Supported values: "auto", "all", "sqrt", "log2", "onethird". 339 If "auto" is set, this parameter is set based on numTrees: 340 if numTrees == 1, set to "all"; 341 if numTrees > 1 (forest) set to "sqrt". 342 (default: "auto") 343 :param impurity: 344 Criterion used for information gain calculation. 345 Supported values: "gini" or "entropy". 346 (default: "gini") 347 :param maxDepth: 348 Maximum depth of tree (e.g. depth 0 means 1 leaf node, depth 1 349 means 1 internal node + 2 leaf nodes). 350 (default: 4) 351 :param maxBins: 352 Maximum number of bins used for splitting features. 353 (default: 32) 354 :param seed: 355 Random seed for bootstrapping and choosing feature subsets. 356 Set as None to generate seed based on system time. 357 (default: None) 358 :return: 359 RandomForestModel that can be used for prediction. 360 361 Example usage: 362 363 >>> from pyspark.mllib.regression import LabeledPoint 364 >>> from pyspark.mllib.tree import RandomForest 365 >>> 366 >>> data = [ 367 ... LabeledPoint(0.0, [0.0]), 368 ... LabeledPoint(0.0, [1.0]), 369 ... LabeledPoint(1.0, [2.0]), 370 ... LabeledPoint(1.0, [3.0]) 371 ... ] 372 >>> model = RandomForest.trainClassifier(sc.parallelize(data), 2, {}, 3, seed=42) 373 >>> model.numTrees() 374 3 375 >>> model.totalNumNodes() 376 7 377 >>> print(model) 378 TreeEnsembleModel classifier with 3 trees 379 <BLANKLINE> 380 >>> print(model.toDebugString()) 381 TreeEnsembleModel classifier with 3 trees 382 <BLANKLINE> 383 Tree 0: 384 Predict: 1.0 385 Tree 1: 386 If (feature 0 <= 1.0) 387 Predict: 0.0 388 Else (feature 0 > 1.0) 389 Predict: 1.0 390 Tree 2: 391 If (feature 0 <= 1.0) 392 Predict: 0.0 393 Else (feature 0 > 1.0) 394 Predict: 1.0 395 <BLANKLINE> 396 >>> model.predict([2.0]) 397 1.0 398 >>> model.predict([0.0]) 399 0.0 400 >>> rdd = sc.parallelize([[3.0], [1.0]]) 401 >>> model.predict(rdd).collect() 402 [1.0, 0.0] 403 """ 404 return cls._train(data, "classification", numClasses, 405 categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, 406 maxDepth, maxBins, seed) 407 408 @classmethod 409 @since("1.2.0") 410 def trainRegressor(cls, data, categoricalFeaturesInfo, numTrees, featureSubsetStrategy="auto", 411 impurity="variance", maxDepth=4, maxBins=32, seed=None): 412 """ 413 Train a random forest model for regression. 414 415 :param data: 416 Training dataset: RDD of LabeledPoint. Labels are real numbers. 417 :param categoricalFeaturesInfo: 418 Map storing arity of categorical features. An entry (n -> k) 419 indicates that feature n is categorical with k categories 420 indexed from 0: {0, 1, ..., k-1}. 421 :param numTrees: 422 Number of trees in the random forest. 423 :param featureSubsetStrategy: 424 Number of features to consider for splits at each node. 425 Supported values: "auto", "all", "sqrt", "log2", "onethird". 426 If "auto" is set, this parameter is set based on numTrees: 427 if numTrees == 1, set to "all"; 428 if numTrees > 1 (forest) set to "onethird" for regression. 429 (default: "auto") 430 :param impurity: 431 Criterion used for information gain calculation. 432 The only supported value for regression is "variance". 433 (default: "variance") 434 :param maxDepth: 435 Maximum depth of tree (e.g. depth 0 means 1 leaf node, depth 1 436 means 1 internal node + 2 leaf nodes). 437 (default: 4) 438 :param maxBins: 439 Maximum number of bins used for splitting features. 440 (default: 32) 441 :param seed: 442 Random seed for bootstrapping and choosing feature subsets. 443 Set as None to generate seed based on system time. 444 (default: None) 445 :return: 446 RandomForestModel that can be used for prediction. 447 448 Example usage: 449 450 >>> from pyspark.mllib.regression import LabeledPoint 451 >>> from pyspark.mllib.tree import RandomForest 452 >>> from pyspark.mllib.linalg import SparseVector 453 >>> 454 >>> sparse_data = [ 455 ... LabeledPoint(0.0, SparseVector(2, {0: 1.0})), 456 ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})), 457 ... LabeledPoint(0.0, SparseVector(2, {0: 1.0})), 458 ... LabeledPoint(1.0, SparseVector(2, {1: 2.0})) 459 ... ] 460 >>> 461 >>> model = RandomForest.trainRegressor(sc.parallelize(sparse_data), {}, 2, seed=42) 462 >>> model.numTrees() 463 2 464 >>> model.totalNumNodes() 465 4 466 >>> model.predict(SparseVector(2, {1: 1.0})) 467 1.0 468 >>> model.predict(SparseVector(2, {0: 1.0})) 469 0.5 470 >>> rdd = sc.parallelize([[0.0, 1.0], [1.0, 0.0]]) 471 >>> model.predict(rdd).collect() 472 [1.0, 0.5] 473 """ 474 return cls._train(data, "regression", 0, categoricalFeaturesInfo, numTrees, 475 featureSubsetStrategy, impurity, maxDepth, maxBins, seed) 476 477 478@inherit_doc 479class GradientBoostedTreesModel(TreeEnsembleModel, JavaLoader): 480 """ 481 Represents a gradient-boosted tree model. 482 483 .. versionadded:: 1.3.0 484 """ 485 486 @classmethod 487 def _java_loader_class(cls): 488 return "org.apache.spark.mllib.tree.model.GradientBoostedTreesModel" 489 490 491class GradientBoostedTrees(object): 492 """ 493 Learning algorithm for a gradient boosted trees model for 494 classification or regression. 495 496 .. versionadded:: 1.3.0 497 """ 498 499 @classmethod 500 def _train(cls, data, algo, categoricalFeaturesInfo, 501 loss, numIterations, learningRate, maxDepth, maxBins): 502 first = data.first() 503 assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint" 504 model = callMLlibFunc("trainGradientBoostedTreesModel", data, algo, categoricalFeaturesInfo, 505 loss, numIterations, learningRate, maxDepth, maxBins) 506 return GradientBoostedTreesModel(model) 507 508 @classmethod 509 @since("1.3.0") 510 def trainClassifier(cls, data, categoricalFeaturesInfo, 511 loss="logLoss", numIterations=100, learningRate=0.1, maxDepth=3, 512 maxBins=32): 513 """ 514 Train a gradient-boosted trees model for classification. 515 516 :param data: 517 Training dataset: RDD of LabeledPoint. Labels should take values 518 {0, 1}. 519 :param categoricalFeaturesInfo: 520 Map storing arity of categorical features. An entry (n -> k) 521 indicates that feature n is categorical with k categories 522 indexed from 0: {0, 1, ..., k-1}. 523 :param loss: 524 Loss function used for minimization during gradient boosting. 525 Supported values: "logLoss", "leastSquaresError", 526 "leastAbsoluteError". 527 (default: "logLoss") 528 :param numIterations: 529 Number of iterations of boosting. 530 (default: 100) 531 :param learningRate: 532 Learning rate for shrinking the contribution of each estimator. 533 The learning rate should be between in the interval (0, 1]. 534 (default: 0.1) 535 :param maxDepth: 536 Maximum depth of tree (e.g. depth 0 means 1 leaf node, depth 1 537 means 1 internal node + 2 leaf nodes). 538 (default: 3) 539 :param maxBins: 540 Maximum number of bins used for splitting features. DecisionTree 541 requires maxBins >= max categories. 542 (default: 32) 543 :return: 544 GradientBoostedTreesModel that can be used for prediction. 545 546 Example usage: 547 548 >>> from pyspark.mllib.regression import LabeledPoint 549 >>> from pyspark.mllib.tree import GradientBoostedTrees 550 >>> 551 >>> data = [ 552 ... LabeledPoint(0.0, [0.0]), 553 ... LabeledPoint(0.0, [1.0]), 554 ... LabeledPoint(1.0, [2.0]), 555 ... LabeledPoint(1.0, [3.0]) 556 ... ] 557 >>> 558 >>> model = GradientBoostedTrees.trainClassifier(sc.parallelize(data), {}, numIterations=10) 559 >>> model.numTrees() 560 10 561 >>> model.totalNumNodes() 562 30 563 >>> print(model) # it already has newline 564 TreeEnsembleModel classifier with 10 trees 565 <BLANKLINE> 566 >>> model.predict([2.0]) 567 1.0 568 >>> model.predict([0.0]) 569 0.0 570 >>> rdd = sc.parallelize([[2.0], [0.0]]) 571 >>> model.predict(rdd).collect() 572 [1.0, 0.0] 573 """ 574 return cls._train(data, "classification", categoricalFeaturesInfo, 575 loss, numIterations, learningRate, maxDepth, maxBins) 576 577 @classmethod 578 @since("1.3.0") 579 def trainRegressor(cls, data, categoricalFeaturesInfo, 580 loss="leastSquaresError", numIterations=100, learningRate=0.1, maxDepth=3, 581 maxBins=32): 582 """ 583 Train a gradient-boosted trees model for regression. 584 585 :param data: 586 Training dataset: RDD of LabeledPoint. Labels are real numbers. 587 :param categoricalFeaturesInfo: 588 Map storing arity of categorical features. An entry (n -> k) 589 indicates that feature n is categorical with k categories 590 indexed from 0: {0, 1, ..., k-1}. 591 :param loss: 592 Loss function used for minimization during gradient boosting. 593 Supported values: "logLoss", "leastSquaresError", 594 "leastAbsoluteError". 595 (default: "leastSquaresError") 596 :param numIterations: 597 Number of iterations of boosting. 598 (default: 100) 599 :param learningRate: 600 Learning rate for shrinking the contribution of each estimator. 601 The learning rate should be between in the interval (0, 1]. 602 (default: 0.1) 603 :param maxDepth: 604 Maximum depth of tree (e.g. depth 0 means 1 leaf node, depth 1 605 means 1 internal node + 2 leaf nodes). 606 (default: 3) 607 :param maxBins: 608 Maximum number of bins used for splitting features. DecisionTree 609 requires maxBins >= max categories. 610 (default: 32) 611 :return: 612 GradientBoostedTreesModel that can be used for prediction. 613 614 Example usage: 615 616 >>> from pyspark.mllib.regression import LabeledPoint 617 >>> from pyspark.mllib.tree import GradientBoostedTrees 618 >>> from pyspark.mllib.linalg import SparseVector 619 >>> 620 >>> sparse_data = [ 621 ... LabeledPoint(0.0, SparseVector(2, {0: 1.0})), 622 ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})), 623 ... LabeledPoint(0.0, SparseVector(2, {0: 1.0})), 624 ... LabeledPoint(1.0, SparseVector(2, {1: 2.0})) 625 ... ] 626 >>> 627 >>> data = sc.parallelize(sparse_data) 628 >>> model = GradientBoostedTrees.trainRegressor(data, {}, numIterations=10) 629 >>> model.numTrees() 630 10 631 >>> model.totalNumNodes() 632 12 633 >>> model.predict(SparseVector(2, {1: 1.0})) 634 1.0 635 >>> model.predict(SparseVector(2, {0: 1.0})) 636 0.0 637 >>> rdd = sc.parallelize([[0.0, 1.0], [1.0, 0.0]]) 638 >>> model.predict(rdd).collect() 639 [1.0, 0.0] 640 """ 641 return cls._train(data, "regression", categoricalFeaturesInfo, 642 loss, numIterations, learningRate, maxDepth, maxBins) 643 644 645def _test(): 646 import doctest 647 globs = globals().copy() 648 from pyspark.sql import SparkSession 649 spark = SparkSession.builder\ 650 .master("local[4]")\ 651 .appName("mllib.tree tests")\ 652 .getOrCreate() 653 globs['sc'] = spark.sparkContext 654 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 655 spark.stop() 656 if failure_count: 657 exit(-1) 658 659if __name__ == "__main__": 660 _test() 661