1# 2# Licensed to the Apache Software Foundation (ASF) under one or more 3# contributor license agreements. See the NOTICE file distributed with 4# this work for additional information regarding copyright ownership. 5# The ASF licenses this file to You under the Apache License, Version 2.0 6# (the "License"); you may not use this file except in compliance with 7# the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import copy 19import sys 20import os 21import re 22import operator 23import shlex 24import warnings 25import heapq 26import bisect 27import random 28import socket 29from subprocess import Popen, PIPE 30from tempfile import NamedTemporaryFile 31from threading import Thread 32from collections import defaultdict 33from itertools import chain 34from functools import reduce 35from math import sqrt, log, isinf, isnan, pow, ceil 36 37if sys.version > '3': 38 basestring = unicode = str 39else: 40 from itertools import imap as map, ifilter as filter 41 42from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 43 BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ 44 PickleSerializer, pack_long, AutoBatchedSerializer 45from pyspark.join import python_join, python_left_outer_join, \ 46 python_right_outer_join, python_full_outer_join, python_cogroup 47from pyspark.statcounter import StatCounter 48from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler 49from pyspark.storagelevel import StorageLevel 50from pyspark.resultiterable import ResultIterable 51from pyspark.shuffle import Aggregator, ExternalMerger, \ 52 get_used_memory, ExternalSorter, ExternalGroupBy 53from pyspark.traceback_utils import SCCallSiteSync 54 55 56__all__ = ["RDD"] 57 58 59def portable_hash(x): 60 """ 61 This function returns consistent hash code for builtin types, especially 62 for None and tuple with None. 63 64 The algorithm is similar to that one used by CPython 2.7 65 66 >>> portable_hash(None) 67 0 68 >>> portable_hash((None, 1)) & 0xffffffff 69 219750521 70 """ 71 if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ: 72 raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED") 73 74 if x is None: 75 return 0 76 if isinstance(x, tuple): 77 h = 0x345678 78 for i in x: 79 h ^= portable_hash(i) 80 h *= 1000003 81 h &= sys.maxsize 82 h ^= len(x) 83 if h == -1: 84 h = -2 85 return int(h) 86 return hash(x) 87 88 89class BoundedFloat(float): 90 """ 91 Bounded value is generated by approximate job, with confidence and low 92 bound and high bound. 93 94 >>> BoundedFloat(100.0, 0.95, 95.0, 105.0) 95 100.0 96 """ 97 def __new__(cls, mean, confidence, low, high): 98 obj = float.__new__(cls, mean) 99 obj.confidence = confidence 100 obj.low = low 101 obj.high = high 102 return obj 103 104 105def _parse_memory(s): 106 """ 107 Parse a memory string in the format supported by Java (e.g. 1g, 200m) and 108 return the value in MB 109 110 >>> _parse_memory("256m") 111 256 112 >>> _parse_memory("2g") 113 2048 114 """ 115 units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024} 116 if s[-1].lower() not in units: 117 raise ValueError("invalid format: " + s) 118 return int(float(s[:-1]) * units[s[-1].lower()]) 119 120 121def _load_from_socket(port, serializer): 122 sock = None 123 # Support for both IPv4 and IPv6. 124 # On most of IPv6-ready systems, IPv6 will take precedence. 125 for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): 126 af, socktype, proto, canonname, sa = res 127 sock = socket.socket(af, socktype, proto) 128 try: 129 sock.settimeout(3) 130 sock.connect(sa) 131 except socket.error: 132 sock.close() 133 sock = None 134 continue 135 break 136 if not sock: 137 raise Exception("could not open socket") 138 # The RDD materialization time is unpredicable, if we set a timeout for socket reading 139 # operation, it will very possibly fail. See SPARK-18281. 140 sock.settimeout(None) 141 # The socket will be automatically closed when garbage-collected. 142 return serializer.load_stream(sock.makefile("rb", 65536)) 143 144 145def ignore_unicode_prefix(f): 146 """ 147 Ignore the 'u' prefix of string in doc tests, to make it works 148 in both python 2 and 3 149 """ 150 if sys.version >= '3': 151 # the representation of unicode string in Python 3 does not have prefix 'u', 152 # so remove the prefix 'u' for doc tests 153 literal_re = re.compile(r"(\W|^)[uU](['])", re.UNICODE) 154 f.__doc__ = literal_re.sub(r'\1\2', f.__doc__) 155 return f 156 157 158class Partitioner(object): 159 def __init__(self, numPartitions, partitionFunc): 160 self.numPartitions = numPartitions 161 self.partitionFunc = partitionFunc 162 163 def __eq__(self, other): 164 return (isinstance(other, Partitioner) and self.numPartitions == other.numPartitions 165 and self.partitionFunc == other.partitionFunc) 166 167 def __call__(self, k): 168 return self.partitionFunc(k) % self.numPartitions 169 170 171class RDD(object): 172 173 """ 174 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 175 Represents an immutable, partitioned collection of elements that can be 176 operated on in parallel. 177 """ 178 179 def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer())): 180 self._jrdd = jrdd 181 self.is_cached = False 182 self.is_checkpointed = False 183 self.ctx = ctx 184 self._jrdd_deserializer = jrdd_deserializer 185 self._id = jrdd.id() 186 self.partitioner = None 187 188 def _pickled(self): 189 return self._reserialize(AutoBatchedSerializer(PickleSerializer())) 190 191 def id(self): 192 """ 193 A unique ID for this RDD (within its SparkContext). 194 """ 195 return self._id 196 197 def __repr__(self): 198 return self._jrdd.toString() 199 200 def __getnewargs__(self): 201 # This method is called when attempting to pickle an RDD, which is always an error: 202 raise Exception( 203 "It appears that you are attempting to broadcast an RDD or reference an RDD from an " 204 "action or transformation. RDD transformations and actions can only be invoked by the " 205 "driver, not inside of other transformations; for example, " 206 "rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values " 207 "transformation and count action cannot be performed inside of the rdd1.map " 208 "transformation. For more information, see SPARK-5063." 209 ) 210 211 @property 212 def context(self): 213 """ 214 The L{SparkContext} that this RDD was created on. 215 """ 216 return self.ctx 217 218 def cache(self): 219 """ 220 Persist this RDD with the default storage level (C{MEMORY_ONLY}). 221 """ 222 self.is_cached = True 223 self.persist(StorageLevel.MEMORY_ONLY) 224 return self 225 226 def persist(self, storageLevel=StorageLevel.MEMORY_ONLY): 227 """ 228 Set this RDD's storage level to persist its values across operations 229 after the first time it is computed. This can only be used to assign 230 a new storage level if the RDD does not have a storage level set yet. 231 If no storage level is specified defaults to (C{MEMORY_ONLY}). 232 233 >>> rdd = sc.parallelize(["b", "a", "c"]) 234 >>> rdd.persist().is_cached 235 True 236 """ 237 self.is_cached = True 238 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 239 self._jrdd.persist(javaStorageLevel) 240 return self 241 242 def unpersist(self): 243 """ 244 Mark the RDD as non-persistent, and remove all blocks for it from 245 memory and disk. 246 """ 247 self.is_cached = False 248 self._jrdd.unpersist() 249 return self 250 251 def checkpoint(self): 252 """ 253 Mark this RDD for checkpointing. It will be saved to a file inside the 254 checkpoint directory set with L{SparkContext.setCheckpointDir()} and 255 all references to its parent RDDs will be removed. This function must 256 be called before any job has been executed on this RDD. It is strongly 257 recommended that this RDD is persisted in memory, otherwise saving it 258 on a file will require recomputation. 259 """ 260 self.is_checkpointed = True 261 self._jrdd.rdd().checkpoint() 262 263 def isCheckpointed(self): 264 """ 265 Return whether this RDD is checkpointed and materialized, either reliably or locally. 266 """ 267 return self._jrdd.rdd().isCheckpointed() 268 269 def localCheckpoint(self): 270 """ 271 Mark this RDD for local checkpointing using Spark's existing caching layer. 272 273 This method is for users who wish to truncate RDD lineages while skipping the expensive 274 step of replicating the materialized data in a reliable distributed file system. This is 275 useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX). 276 277 Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed 278 data is written to ephemeral local storage in the executors instead of to a reliable, 279 fault-tolerant storage. The effect is that if an executor fails during the computation, 280 the checkpointed data may no longer be accessible, causing an irrecoverable job failure. 281 282 This is NOT safe to use with dynamic allocation, which removes executors along 283 with their cached blocks. If you must use both features, you are advised to set 284 L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value. 285 286 The checkpoint directory set through L{SparkContext.setCheckpointDir()} is not used. 287 """ 288 self._jrdd.rdd().localCheckpoint() 289 290 def isLocallyCheckpointed(self): 291 """ 292 Return whether this RDD is marked for local checkpointing. 293 294 Exposed for testing. 295 """ 296 return self._jrdd.rdd().isLocallyCheckpointed() 297 298 def getCheckpointFile(self): 299 """ 300 Gets the name of the file to which this RDD was checkpointed 301 302 Not defined if RDD is checkpointed locally. 303 """ 304 checkpointFile = self._jrdd.rdd().getCheckpointFile() 305 if checkpointFile.isDefined(): 306 return checkpointFile.get() 307 308 def map(self, f, preservesPartitioning=False): 309 """ 310 Return a new RDD by applying a function to each element of this RDD. 311 312 >>> rdd = sc.parallelize(["b", "a", "c"]) 313 >>> sorted(rdd.map(lambda x: (x, 1)).collect()) 314 [('a', 1), ('b', 1), ('c', 1)] 315 """ 316 def func(_, iterator): 317 return map(f, iterator) 318 return self.mapPartitionsWithIndex(func, preservesPartitioning) 319 320 def flatMap(self, f, preservesPartitioning=False): 321 """ 322 Return a new RDD by first applying a function to all elements of this 323 RDD, and then flattening the results. 324 325 >>> rdd = sc.parallelize([2, 3, 4]) 326 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 327 [1, 1, 1, 2, 2, 3] 328 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 329 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 330 """ 331 def func(s, iterator): 332 return chain.from_iterable(map(f, iterator)) 333 return self.mapPartitionsWithIndex(func, preservesPartitioning) 334 335 def mapPartitions(self, f, preservesPartitioning=False): 336 """ 337 Return a new RDD by applying a function to each partition of this RDD. 338 339 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 340 >>> def f(iterator): yield sum(iterator) 341 >>> rdd.mapPartitions(f).collect() 342 [3, 7] 343 """ 344 def func(s, iterator): 345 return f(iterator) 346 return self.mapPartitionsWithIndex(func, preservesPartitioning) 347 348 def mapPartitionsWithIndex(self, f, preservesPartitioning=False): 349 """ 350 Return a new RDD by applying a function to each partition of this RDD, 351 while tracking the index of the original partition. 352 353 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 354 >>> def f(splitIndex, iterator): yield splitIndex 355 >>> rdd.mapPartitionsWithIndex(f).sum() 356 6 357 """ 358 return PipelinedRDD(self, f, preservesPartitioning) 359 360 def mapPartitionsWithSplit(self, f, preservesPartitioning=False): 361 """ 362 Deprecated: use mapPartitionsWithIndex instead. 363 364 Return a new RDD by applying a function to each partition of this RDD, 365 while tracking the index of the original partition. 366 367 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 368 >>> def f(splitIndex, iterator): yield splitIndex 369 >>> rdd.mapPartitionsWithSplit(f).sum() 370 6 371 """ 372 warnings.warn("mapPartitionsWithSplit is deprecated; " 373 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 374 return self.mapPartitionsWithIndex(f, preservesPartitioning) 375 376 def getNumPartitions(self): 377 """ 378 Returns the number of partitions in RDD 379 380 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 381 >>> rdd.getNumPartitions() 382 2 383 """ 384 return self._jrdd.partitions().size() 385 386 def filter(self, f): 387 """ 388 Return a new RDD containing only the elements that satisfy a predicate. 389 390 >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 391 >>> rdd.filter(lambda x: x % 2 == 0).collect() 392 [2, 4] 393 """ 394 def func(iterator): 395 return filter(f, iterator) 396 return self.mapPartitions(func, True) 397 398 def distinct(self, numPartitions=None): 399 """ 400 Return a new RDD containing the distinct elements in this RDD. 401 402 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 403 [1, 2, 3] 404 """ 405 return self.map(lambda x: (x, None)) \ 406 .reduceByKey(lambda x, _: x, numPartitions) \ 407 .map(lambda x: x[0]) 408 409 def sample(self, withReplacement, fraction, seed=None): 410 """ 411 Return a sampled subset of this RDD. 412 413 :param withReplacement: can elements be sampled multiple times (replaced when sampled out) 414 :param fraction: expected size of the sample as a fraction of this RDD's size 415 without replacement: probability that each element is chosen; fraction must be [0, 1] 416 with replacement: expected number of times each element is chosen; fraction must be >= 0 417 :param seed: seed for the random number generator 418 419 .. note:: This is not guaranteed to provide exactly the fraction specified of the total 420 count of the given :class:`DataFrame`. 421 422 >>> rdd = sc.parallelize(range(100), 4) 423 >>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14 424 True 425 """ 426 assert fraction >= 0.0, "Negative fraction value: %s" % fraction 427 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) 428 429 def randomSplit(self, weights, seed=None): 430 """ 431 Randomly splits this RDD with the provided weights. 432 433 :param weights: weights for splits, will be normalized if they don't sum to 1 434 :param seed: random seed 435 :return: split RDDs in a list 436 437 >>> rdd = sc.parallelize(range(500), 1) 438 >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17) 439 >>> len(rdd1.collect() + rdd2.collect()) 440 500 441 >>> 150 < rdd1.count() < 250 442 True 443 >>> 250 < rdd2.count() < 350 444 True 445 """ 446 s = float(sum(weights)) 447 cweights = [0.0] 448 for w in weights: 449 cweights.append(cweights[-1] + w / s) 450 if seed is None: 451 seed = random.randint(0, 2 ** 32 - 1) 452 return [self.mapPartitionsWithIndex(RDDRangeSampler(lb, ub, seed).func, True) 453 for lb, ub in zip(cweights, cweights[1:])] 454 455 # this is ported from scala/spark/RDD.scala 456 def takeSample(self, withReplacement, num, seed=None): 457 """ 458 Return a fixed-size sampled subset of this RDD. 459 460 .. note:: This method should only be used if the resulting array is expected 461 to be small, as all the data is loaded into the driver's memory. 462 463 >>> rdd = sc.parallelize(range(0, 10)) 464 >>> len(rdd.takeSample(True, 20, 1)) 465 20 466 >>> len(rdd.takeSample(False, 5, 2)) 467 5 468 >>> len(rdd.takeSample(False, 15, 3)) 469 10 470 """ 471 numStDev = 10.0 472 473 if num < 0: 474 raise ValueError("Sample size cannot be negative.") 475 elif num == 0: 476 return [] 477 478 initialCount = self.count() 479 if initialCount == 0: 480 return [] 481 482 rand = random.Random(seed) 483 484 if (not withReplacement) and num >= initialCount: 485 # shuffle current RDD and return 486 samples = self.collect() 487 rand.shuffle(samples) 488 return samples 489 490 maxSampleSize = sys.maxsize - int(numStDev * sqrt(sys.maxsize)) 491 if num > maxSampleSize: 492 raise ValueError( 493 "Sample size cannot be greater than %d." % maxSampleSize) 494 495 fraction = RDD._computeFractionForSampleSize( 496 num, initialCount, withReplacement) 497 samples = self.sample(withReplacement, fraction, seed).collect() 498 499 # If the first sample didn't turn out large enough, keep trying to take samples; 500 # this shouldn't happen often because we use a big multiplier for their initial size. 501 # See: scala/spark/RDD.scala 502 while len(samples) < num: 503 # TODO: add log warning for when more than one iteration was run 504 seed = rand.randint(0, sys.maxsize) 505 samples = self.sample(withReplacement, fraction, seed).collect() 506 507 rand.shuffle(samples) 508 509 return samples[0:num] 510 511 @staticmethod 512 def _computeFractionForSampleSize(sampleSizeLowerBound, total, withReplacement): 513 """ 514 Returns a sampling rate that guarantees a sample of 515 size >= sampleSizeLowerBound 99.99% of the time. 516 517 How the sampling rate is determined: 518 Let p = num / total, where num is the sample size and total is the 519 total number of data points in the RDD. We're trying to compute 520 q > p such that 521 - when sampling with replacement, we're drawing each data point 522 with prob_i ~ Pois(q), where we want to guarantee 523 Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to 524 total), i.e. the failure rate of not having a sufficiently large 525 sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient 526 to guarantee 0.9999 success rate for num > 12, but we need a 527 slightly larger q (9 empirically determined). 528 - when sampling without replacement, we're drawing each data point 529 with prob_i ~ Binomial(total, fraction) and our choice of q 530 guarantees 1-delta, or 0.9999 success rate, where success rate is 531 defined the same as in sampling with replacement. 532 """ 533 fraction = float(sampleSizeLowerBound) / total 534 if withReplacement: 535 numStDev = 5 536 if (sampleSizeLowerBound < 12): 537 numStDev = 9 538 return fraction + numStDev * sqrt(fraction / total) 539 else: 540 delta = 0.00005 541 gamma = - log(delta) / total 542 return min(1, fraction + gamma + sqrt(gamma * gamma + 2 * gamma * fraction)) 543 544 def union(self, other): 545 """ 546 Return the union of this RDD and another one. 547 548 >>> rdd = sc.parallelize([1, 1, 2, 3]) 549 >>> rdd.union(rdd).collect() 550 [1, 1, 2, 3, 1, 1, 2, 3] 551 """ 552 if self._jrdd_deserializer == other._jrdd_deserializer: 553 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 554 self._jrdd_deserializer) 555 else: 556 # These RDDs contain data in different serialized formats, so we 557 # must normalize them to the default serializer. 558 self_copy = self._reserialize() 559 other_copy = other._reserialize() 560 rdd = RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 561 self.ctx.serializer) 562 if (self.partitioner == other.partitioner and 563 self.getNumPartitions() == rdd.getNumPartitions()): 564 rdd.partitioner = self.partitioner 565 return rdd 566 567 def intersection(self, other): 568 """ 569 Return the intersection of this RDD and another one. The output will 570 not contain any duplicate elements, even if the input RDDs did. 571 572 .. note:: This method performs a shuffle internally. 573 574 >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) 575 >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) 576 >>> rdd1.intersection(rdd2).collect() 577 [1, 2, 3] 578 """ 579 return self.map(lambda v: (v, None)) \ 580 .cogroup(other.map(lambda v: (v, None))) \ 581 .filter(lambda k_vs: all(k_vs[1])) \ 582 .keys() 583 584 def _reserialize(self, serializer=None): 585 serializer = serializer or self.ctx.serializer 586 if self._jrdd_deserializer != serializer: 587 self = self.map(lambda x: x, preservesPartitioning=True) 588 self._jrdd_deserializer = serializer 589 return self 590 591 def __add__(self, other): 592 """ 593 Return the union of this RDD and another one. 594 595 >>> rdd = sc.parallelize([1, 1, 2, 3]) 596 >>> (rdd + rdd).collect() 597 [1, 1, 2, 3, 1, 1, 2, 3] 598 """ 599 if not isinstance(other, RDD): 600 raise TypeError 601 return self.union(other) 602 603 def repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=portable_hash, 604 ascending=True, keyfunc=lambda x: x): 605 """ 606 Repartition the RDD according to the given partitioner and, within each resulting partition, 607 sort records by their keys. 608 609 >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)]) 610 >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 2) 611 >>> rdd2.glom().collect() 612 [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]] 613 """ 614 if numPartitions is None: 615 numPartitions = self._defaultReducePartitions() 616 617 memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m")) 618 serializer = self._jrdd_deserializer 619 620 def sortPartition(iterator): 621 sort = ExternalSorter(memory * 0.9, serializer).sorted 622 return iter(sort(iterator, key=lambda k_v: keyfunc(k_v[0]), reverse=(not ascending))) 623 624 return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True) 625 626 def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x): 627 """ 628 Sorts this RDD, which is assumed to consist of (key, value) pairs. 629 # noqa 630 631 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 632 >>> sc.parallelize(tmp).sortByKey().first() 633 ('1', 3) 634 >>> sc.parallelize(tmp).sortByKey(True, 1).collect() 635 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 636 >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 637 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 638 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 639 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 640 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 641 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)] 642 """ 643 if numPartitions is None: 644 numPartitions = self._defaultReducePartitions() 645 646 memory = self._memory_limit() 647 serializer = self._jrdd_deserializer 648 649 def sortPartition(iterator): 650 sort = ExternalSorter(memory * 0.9, serializer).sorted 651 return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending))) 652 653 if numPartitions == 1: 654 if self.getNumPartitions() > 1: 655 self = self.coalesce(1) 656 return self.mapPartitions(sortPartition, True) 657 658 # first compute the boundary of each part via sampling: we want to partition 659 # the key-space into bins such that the bins have roughly the same 660 # number of (key, value) pairs falling into them 661 rddSize = self.count() 662 if not rddSize: 663 return self # empty RDD 664 maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner 665 fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 666 samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect() 667 samples = sorted(samples, key=keyfunc) 668 669 # we have numPartitions many parts but one of the them has 670 # an implicit boundary 671 bounds = [samples[int(len(samples) * (i + 1) / numPartitions)] 672 for i in range(0, numPartitions - 1)] 673 674 def rangePartitioner(k): 675 p = bisect.bisect_left(bounds, keyfunc(k)) 676 if ascending: 677 return p 678 else: 679 return numPartitions - 1 - p 680 681 return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True) 682 683 def sortBy(self, keyfunc, ascending=True, numPartitions=None): 684 """ 685 Sorts this RDD by the given keyfunc 686 687 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 688 >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect() 689 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 690 >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect() 691 [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 692 """ 693 return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values() 694 695 def glom(self): 696 """ 697 Return an RDD created by coalescing all elements within each partition 698 into a list. 699 700 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 701 >>> sorted(rdd.glom().collect()) 702 [[1, 2], [3, 4]] 703 """ 704 def func(iterator): 705 yield list(iterator) 706 return self.mapPartitions(func) 707 708 def cartesian(self, other): 709 """ 710 Return the Cartesian product of this RDD and another one, that is, the 711 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 712 C{b} is in C{other}. 713 714 >>> rdd = sc.parallelize([1, 2]) 715 >>> sorted(rdd.cartesian(rdd).collect()) 716 [(1, 1), (1, 2), (2, 1), (2, 2)] 717 """ 718 # Due to batching, we can't use the Java cartesian method. 719 deserializer = CartesianDeserializer(self._jrdd_deserializer, 720 other._jrdd_deserializer) 721 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer) 722 723 def groupBy(self, f, numPartitions=None, partitionFunc=portable_hash): 724 """ 725 Return an RDD of grouped items. 726 727 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 728 >>> result = rdd.groupBy(lambda x: x % 2).collect() 729 >>> sorted([(x, sorted(y)) for (x, y) in result]) 730 [(0, [2, 8]), (1, [1, 1, 3, 5])] 731 """ 732 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions, partitionFunc) 733 734 @ignore_unicode_prefix 735 def pipe(self, command, env=None, checkCode=False): 736 """ 737 Return an RDD created by piping elements to a forked external process. 738 739 >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() 740 [u'1', u'2', u'', u'3'] 741 742 :param checkCode: whether or not to check the return value of the shell command. 743 """ 744 if env is None: 745 env = dict() 746 747 def func(iterator): 748 pipe = Popen( 749 shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 750 751 def pipe_objs(out): 752 for obj in iterator: 753 s = str(obj).rstrip('\n') + '\n' 754 out.write(s.encode('utf-8')) 755 out.close() 756 Thread(target=pipe_objs, args=[pipe.stdin]).start() 757 758 def check_return_code(): 759 pipe.wait() 760 if checkCode and pipe.returncode: 761 raise Exception("Pipe function `%s' exited " 762 "with error code %d" % (command, pipe.returncode)) 763 else: 764 for i in range(0): 765 yield i 766 return (x.rstrip(b'\n').decode('utf-8') for x in 767 chain(iter(pipe.stdout.readline, b''), check_return_code())) 768 return self.mapPartitions(func) 769 770 def foreach(self, f): 771 """ 772 Applies a function to all elements of this RDD. 773 774 >>> def f(x): print(x) 775 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 776 """ 777 def processPartition(iterator): 778 for x in iterator: 779 f(x) 780 return iter([]) 781 self.mapPartitions(processPartition).count() # Force evaluation 782 783 def foreachPartition(self, f): 784 """ 785 Applies a function to each partition of this RDD. 786 787 >>> def f(iterator): 788 ... for x in iterator: 789 ... print(x) 790 >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) 791 """ 792 def func(it): 793 r = f(it) 794 try: 795 return iter(r) 796 except TypeError: 797 return iter([]) 798 self.mapPartitions(func).count() # Force evaluation 799 800 def collect(self): 801 """ 802 Return a list that contains all of the elements in this RDD. 803 804 .. note:: This method should only be used if the resulting array is expected 805 to be small, as all the data is loaded into the driver's memory. 806 """ 807 with SCCallSiteSync(self.context) as css: 808 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 809 return list(_load_from_socket(port, self._jrdd_deserializer)) 810 811 def reduce(self, f): 812 """ 813 Reduces the elements of this RDD using the specified commutative and 814 associative binary operator. Currently reduces partitions locally. 815 816 >>> from operator import add 817 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 818 15 819 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 820 10 821 >>> sc.parallelize([]).reduce(add) 822 Traceback (most recent call last): 823 ... 824 ValueError: Can not reduce() empty RDD 825 """ 826 def func(iterator): 827 iterator = iter(iterator) 828 try: 829 initial = next(iterator) 830 except StopIteration: 831 return 832 yield reduce(f, iterator, initial) 833 834 vals = self.mapPartitions(func).collect() 835 if vals: 836 return reduce(f, vals) 837 raise ValueError("Can not reduce() empty RDD") 838 839 def treeReduce(self, f, depth=2): 840 """ 841 Reduces the elements of this RDD in a multi-level tree pattern. 842 843 :param depth: suggested depth of the tree (default: 2) 844 845 >>> add = lambda x, y: x + y 846 >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10) 847 >>> rdd.treeReduce(add) 848 -5 849 >>> rdd.treeReduce(add, 1) 850 -5 851 >>> rdd.treeReduce(add, 2) 852 -5 853 >>> rdd.treeReduce(add, 5) 854 -5 855 >>> rdd.treeReduce(add, 10) 856 -5 857 """ 858 if depth < 1: 859 raise ValueError("Depth cannot be smaller than 1 but got %d." % depth) 860 861 zeroValue = None, True # Use the second entry to indicate whether this is a dummy value. 862 863 def op(x, y): 864 if x[1]: 865 return y 866 elif y[1]: 867 return x 868 else: 869 return f(x[0], y[0]), False 870 871 reduced = self.map(lambda x: (x, False)).treeAggregate(zeroValue, op, op, depth) 872 if reduced[1]: 873 raise ValueError("Cannot reduce empty RDD.") 874 return reduced[0] 875 876 def fold(self, zeroValue, op): 877 """ 878 Aggregate the elements of each partition, and then the results for all 879 the partitions, using a given associative function and a neutral "zero value." 880 881 The function C{op(t1, t2)} is allowed to modify C{t1} and return it 882 as its result value to avoid object allocation; however, it should not 883 modify C{t2}. 884 885 This behaves somewhat differently from fold operations implemented 886 for non-distributed collections in functional languages like Scala. 887 This fold operation may be applied to partitions individually, and then 888 fold those results into the final result, rather than apply the fold 889 to each element sequentially in some defined ordering. For functions 890 that are not commutative, the result may differ from that of a fold 891 applied to a non-distributed collection. 892 893 >>> from operator import add 894 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 895 15 896 """ 897 def func(iterator): 898 acc = zeroValue 899 for obj in iterator: 900 acc = op(acc, obj) 901 yield acc 902 # collecting result of mapPartitions here ensures that the copy of 903 # zeroValue provided to each partition is unique from the one provided 904 # to the final reduce call 905 vals = self.mapPartitions(func).collect() 906 return reduce(op, vals, zeroValue) 907 908 def aggregate(self, zeroValue, seqOp, combOp): 909 """ 910 Aggregate the elements of each partition, and then the results for all 911 the partitions, using a given combine functions and a neutral "zero 912 value." 913 914 The functions C{op(t1, t2)} is allowed to modify C{t1} and return it 915 as its result value to avoid object allocation; however, it should not 916 modify C{t2}. 917 918 The first function (seqOp) can return a different result type, U, than 919 the type of this RDD. Thus, we need one operation for merging a T into 920 an U and one operation for merging two U 921 922 >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) 923 >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) 924 >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) 925 (10, 4) 926 >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp) 927 (0, 0) 928 """ 929 def func(iterator): 930 acc = zeroValue 931 for obj in iterator: 932 acc = seqOp(acc, obj) 933 yield acc 934 # collecting result of mapPartitions here ensures that the copy of 935 # zeroValue provided to each partition is unique from the one provided 936 # to the final reduce call 937 vals = self.mapPartitions(func).collect() 938 return reduce(combOp, vals, zeroValue) 939 940 def treeAggregate(self, zeroValue, seqOp, combOp, depth=2): 941 """ 942 Aggregates the elements of this RDD in a multi-level tree 943 pattern. 944 945 :param depth: suggested depth of the tree (default: 2) 946 947 >>> add = lambda x, y: x + y 948 >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10) 949 >>> rdd.treeAggregate(0, add, add) 950 -5 951 >>> rdd.treeAggregate(0, add, add, 1) 952 -5 953 >>> rdd.treeAggregate(0, add, add, 2) 954 -5 955 >>> rdd.treeAggregate(0, add, add, 5) 956 -5 957 >>> rdd.treeAggregate(0, add, add, 10) 958 -5 959 """ 960 if depth < 1: 961 raise ValueError("Depth cannot be smaller than 1 but got %d." % depth) 962 963 if self.getNumPartitions() == 0: 964 return zeroValue 965 966 def aggregatePartition(iterator): 967 acc = zeroValue 968 for obj in iterator: 969 acc = seqOp(acc, obj) 970 yield acc 971 972 partiallyAggregated = self.mapPartitions(aggregatePartition) 973 numPartitions = partiallyAggregated.getNumPartitions() 974 scale = max(int(ceil(pow(numPartitions, 1.0 / depth))), 2) 975 # If creating an extra level doesn't help reduce the wall-clock time, we stop the tree 976 # aggregation. 977 while numPartitions > scale + numPartitions / scale: 978 numPartitions /= scale 979 curNumPartitions = int(numPartitions) 980 981 def mapPartition(i, iterator): 982 for obj in iterator: 983 yield (i % curNumPartitions, obj) 984 985 partiallyAggregated = partiallyAggregated \ 986 .mapPartitionsWithIndex(mapPartition) \ 987 .reduceByKey(combOp, curNumPartitions) \ 988 .values() 989 990 return partiallyAggregated.reduce(combOp) 991 992 def max(self, key=None): 993 """ 994 Find the maximum item in this RDD. 995 996 :param key: A function used to generate key for comparing 997 998 >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0]) 999 >>> rdd.max() 1000 43.0 1001 >>> rdd.max(key=str) 1002 5.0 1003 """ 1004 if key is None: 1005 return self.reduce(max) 1006 return self.reduce(lambda a, b: max(a, b, key=key)) 1007 1008 def min(self, key=None): 1009 """ 1010 Find the minimum item in this RDD. 1011 1012 :param key: A function used to generate key for comparing 1013 1014 >>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0]) 1015 >>> rdd.min() 1016 2.0 1017 >>> rdd.min(key=str) 1018 10.0 1019 """ 1020 if key is None: 1021 return self.reduce(min) 1022 return self.reduce(lambda a, b: min(a, b, key=key)) 1023 1024 def sum(self): 1025 """ 1026 Add up the elements in this RDD. 1027 1028 >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 1029 6.0 1030 """ 1031 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 1032 1033 def count(self): 1034 """ 1035 Return the number of elements in this RDD. 1036 1037 >>> sc.parallelize([2, 3, 4]).count() 1038 3 1039 """ 1040 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 1041 1042 def stats(self): 1043 """ 1044 Return a L{StatCounter} object that captures the mean, variance 1045 and count of the RDD's elements in one operation. 1046 """ 1047 def redFunc(left_counter, right_counter): 1048 return left_counter.mergeStats(right_counter) 1049 1050 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 1051 1052 def histogram(self, buckets): 1053 """ 1054 Compute a histogram using the provided buckets. The buckets 1055 are all open to the right except for the last which is closed. 1056 e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], 1057 which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 1058 and 50 we would have a histogram of 1,0,1. 1059 1060 If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), 1061 this can be switched from an O(log n) inseration to O(1) per 1062 element (where n is the number of buckets). 1063 1064 Buckets must be sorted, not contain any duplicates, and have 1065 at least two elements. 1066 1067 If `buckets` is a number, it will generate buckets which are 1068 evenly spaced between the minimum and maximum of the RDD. For 1069 example, if the min value is 0 and the max is 100, given `buckets` 1070 as 2, the resulting buckets will be [0,50) [50,100]. `buckets` must 1071 be at least 1. An exception is raised if the RDD contains infinity. 1072 If the elements in the RDD do not vary (max == min), a single bucket 1073 will be used. 1074 1075 The return value is a tuple of buckets and histogram. 1076 1077 >>> rdd = sc.parallelize(range(51)) 1078 >>> rdd.histogram(2) 1079 ([0, 25, 50], [25, 26]) 1080 >>> rdd.histogram([0, 5, 25, 50]) 1081 ([0, 5, 25, 50], [5, 20, 26]) 1082 >>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets 1083 ([0, 15, 30, 45, 60], [15, 15, 15, 6]) 1084 >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"]) 1085 >>> rdd.histogram(("a", "b", "c")) 1086 (('a', 'b', 'c'), [2, 2]) 1087 """ 1088 1089 if isinstance(buckets, int): 1090 if buckets < 1: 1091 raise ValueError("number of buckets must be >= 1") 1092 1093 # filter out non-comparable elements 1094 def comparable(x): 1095 if x is None: 1096 return False 1097 if type(x) is float and isnan(x): 1098 return False 1099 return True 1100 1101 filtered = self.filter(comparable) 1102 1103 # faster than stats() 1104 def minmax(a, b): 1105 return min(a[0], b[0]), max(a[1], b[1]) 1106 try: 1107 minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax) 1108 except TypeError as e: 1109 if " empty " in str(e): 1110 raise ValueError("can not generate buckets from empty RDD") 1111 raise 1112 1113 if minv == maxv or buckets == 1: 1114 return [minv, maxv], [filtered.count()] 1115 1116 try: 1117 inc = (maxv - minv) / buckets 1118 except TypeError: 1119 raise TypeError("Can not generate buckets with non-number in RDD") 1120 1121 if isinf(inc): 1122 raise ValueError("Can not generate buckets with infinite value") 1123 1124 # keep them as integer if possible 1125 inc = int(inc) 1126 if inc * buckets != maxv - minv: 1127 inc = (maxv - minv) * 1.0 / buckets 1128 1129 buckets = [i * inc + minv for i in range(buckets)] 1130 buckets.append(maxv) # fix accumulated error 1131 even = True 1132 1133 elif isinstance(buckets, (list, tuple)): 1134 if len(buckets) < 2: 1135 raise ValueError("buckets should have more than one value") 1136 1137 if any(i is None or isinstance(i, float) and isnan(i) for i in buckets): 1138 raise ValueError("can not have None or NaN in buckets") 1139 1140 if sorted(buckets) != list(buckets): 1141 raise ValueError("buckets should be sorted") 1142 1143 if len(set(buckets)) != len(buckets): 1144 raise ValueError("buckets should not contain duplicated values") 1145 1146 minv = buckets[0] 1147 maxv = buckets[-1] 1148 even = False 1149 inc = None 1150 try: 1151 steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)] 1152 except TypeError: 1153 pass # objects in buckets do not support '-' 1154 else: 1155 if max(steps) - min(steps) < 1e-10: # handle precision errors 1156 even = True 1157 inc = (maxv - minv) / (len(buckets) - 1) 1158 1159 else: 1160 raise TypeError("buckets should be a list or tuple or number(int or long)") 1161 1162 def histogram(iterator): 1163 counters = [0] * len(buckets) 1164 for i in iterator: 1165 if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv: 1166 continue 1167 t = (int((i - minv) / inc) if even 1168 else bisect.bisect_right(buckets, i) - 1) 1169 counters[t] += 1 1170 # add last two together 1171 last = counters.pop() 1172 counters[-1] += last 1173 return [counters] 1174 1175 def mergeCounters(a, b): 1176 return [i + j for i, j in zip(a, b)] 1177 1178 return buckets, self.mapPartitions(histogram).reduce(mergeCounters) 1179 1180 def mean(self): 1181 """ 1182 Compute the mean of this RDD's elements. 1183 1184 >>> sc.parallelize([1, 2, 3]).mean() 1185 2.0 1186 """ 1187 return self.stats().mean() 1188 1189 def variance(self): 1190 """ 1191 Compute the variance of this RDD's elements. 1192 1193 >>> sc.parallelize([1, 2, 3]).variance() 1194 0.666... 1195 """ 1196 return self.stats().variance() 1197 1198 def stdev(self): 1199 """ 1200 Compute the standard deviation of this RDD's elements. 1201 1202 >>> sc.parallelize([1, 2, 3]).stdev() 1203 0.816... 1204 """ 1205 return self.stats().stdev() 1206 1207 def sampleStdev(self): 1208 """ 1209 Compute the sample standard deviation of this RDD's elements (which 1210 corrects for bias in estimating the standard deviation by dividing by 1211 N-1 instead of N). 1212 1213 >>> sc.parallelize([1, 2, 3]).sampleStdev() 1214 1.0 1215 """ 1216 return self.stats().sampleStdev() 1217 1218 def sampleVariance(self): 1219 """ 1220 Compute the sample variance of this RDD's elements (which corrects 1221 for bias in estimating the variance by dividing by N-1 instead of N). 1222 1223 >>> sc.parallelize([1, 2, 3]).sampleVariance() 1224 1.0 1225 """ 1226 return self.stats().sampleVariance() 1227 1228 def countByValue(self): 1229 """ 1230 Return the count of each unique value in this RDD as a dictionary of 1231 (value, count) pairs. 1232 1233 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 1234 [(1, 2), (2, 3)] 1235 """ 1236 def countPartition(iterator): 1237 counts = defaultdict(int) 1238 for obj in iterator: 1239 counts[obj] += 1 1240 yield counts 1241 1242 def mergeMaps(m1, m2): 1243 for k, v in m2.items(): 1244 m1[k] += v 1245 return m1 1246 return self.mapPartitions(countPartition).reduce(mergeMaps) 1247 1248 def top(self, num, key=None): 1249 """ 1250 Get the top N elements from an RDD. 1251 1252 .. note:: This method should only be used if the resulting array is expected 1253 to be small, as all the data is loaded into the driver's memory. 1254 1255 .. note:: It returns the list sorted in descending order. 1256 1257 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 1258 [12] 1259 >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2) 1260 [6, 5] 1261 >>> sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str) 1262 [4, 3, 2] 1263 """ 1264 def topIterator(iterator): 1265 yield heapq.nlargest(num, iterator, key=key) 1266 1267 def merge(a, b): 1268 return heapq.nlargest(num, a + b, key=key) 1269 1270 return self.mapPartitions(topIterator).reduce(merge) 1271 1272 def takeOrdered(self, num, key=None): 1273 """ 1274 Get the N elements from an RDD ordered in ascending order or as 1275 specified by the optional key function. 1276 1277 .. note:: this method should only be used if the resulting array is expected 1278 to be small, as all the data is loaded into the driver's memory. 1279 1280 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 1281 [1, 2, 3, 4, 5, 6] 1282 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 1283 [10, 9, 7, 6, 5, 4] 1284 """ 1285 1286 def merge(a, b): 1287 return heapq.nsmallest(num, a + b, key) 1288 1289 return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge) 1290 1291 def take(self, num): 1292 """ 1293 Take the first num elements of the RDD. 1294 1295 It works by first scanning one partition, and use the results from 1296 that partition to estimate the number of additional partitions needed 1297 to satisfy the limit. 1298 1299 Translated from the Scala implementation in RDD#take(). 1300 1301 .. note:: this method should only be used if the resulting array is expected 1302 to be small, as all the data is loaded into the driver's memory. 1303 1304 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 1305 [2, 3] 1306 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 1307 [2, 3, 4, 5, 6] 1308 >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3) 1309 [91, 92, 93] 1310 """ 1311 items = [] 1312 totalParts = self.getNumPartitions() 1313 partsScanned = 0 1314 1315 while len(items) < num and partsScanned < totalParts: 1316 # The number of partitions to try in this iteration. 1317 # It is ok for this number to be greater than totalParts because 1318 # we actually cap it at totalParts in runJob. 1319 numPartsToTry = 1 1320 if partsScanned > 0: 1321 # If we didn't find any rows after the previous iteration, 1322 # quadruple and retry. Otherwise, interpolate the number of 1323 # partitions we need to try, but overestimate it by 50%. 1324 # We also cap the estimation in the end. 1325 if len(items) == 0: 1326 numPartsToTry = partsScanned * 4 1327 else: 1328 # the first paramter of max is >=1 whenever partsScanned >= 2 1329 numPartsToTry = int(1.5 * num * partsScanned / len(items)) - partsScanned 1330 numPartsToTry = min(max(numPartsToTry, 1), partsScanned * 4) 1331 1332 left = num - len(items) 1333 1334 def takeUpToNumLeft(iterator): 1335 iterator = iter(iterator) 1336 taken = 0 1337 while taken < left: 1338 yield next(iterator) 1339 taken += 1 1340 1341 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) 1342 res = self.context.runJob(self, takeUpToNumLeft, p) 1343 1344 items += res 1345 partsScanned += numPartsToTry 1346 1347 return items[:num] 1348 1349 def first(self): 1350 """ 1351 Return the first element in this RDD. 1352 1353 >>> sc.parallelize([2, 3, 4]).first() 1354 2 1355 >>> sc.parallelize([]).first() 1356 Traceback (most recent call last): 1357 ... 1358 ValueError: RDD is empty 1359 """ 1360 rs = self.take(1) 1361 if rs: 1362 return rs[0] 1363 raise ValueError("RDD is empty") 1364 1365 def isEmpty(self): 1366 """ 1367 Returns true if and only if the RDD contains no elements at all. 1368 1369 .. note:: an RDD may be empty even when it has at least 1 partition. 1370 1371 >>> sc.parallelize([]).isEmpty() 1372 True 1373 >>> sc.parallelize([1]).isEmpty() 1374 False 1375 """ 1376 return self.getNumPartitions() == 0 or len(self.take(1)) == 0 1377 1378 def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None): 1379 """ 1380 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1381 system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are 1382 converted for output using either user specified converters or, by default, 1383 L{org.apache.spark.api.python.JavaToWritableConverter}. 1384 1385 :param conf: Hadoop job configuration, passed in as a dict 1386 :param keyConverter: (None by default) 1387 :param valueConverter: (None by default) 1388 """ 1389 jconf = self.ctx._dictToJavaMap(conf) 1390 pickledRDD = self._pickled() 1391 self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf, 1392 keyConverter, valueConverter, True) 1393 1394 def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, 1395 keyConverter=None, valueConverter=None, conf=None): 1396 """ 1397 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1398 system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types 1399 will be inferred if not specified. Keys and values are converted for output using either 1400 user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The 1401 C{conf} is applied on top of the base Hadoop conf associated with the SparkContext 1402 of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. 1403 1404 :param path: path to Hadoop file 1405 :param outputFormatClass: fully qualified classname of Hadoop OutputFormat 1406 (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat") 1407 :param keyClass: fully qualified classname of key Writable class 1408 (e.g. "org.apache.hadoop.io.IntWritable", None by default) 1409 :param valueClass: fully qualified classname of value Writable class 1410 (e.g. "org.apache.hadoop.io.Text", None by default) 1411 :param keyConverter: (None by default) 1412 :param valueConverter: (None by default) 1413 :param conf: Hadoop job configuration, passed in as a dict (None by default) 1414 """ 1415 jconf = self.ctx._dictToJavaMap(conf) 1416 pickledRDD = self._pickled() 1417 self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, True, path, 1418 outputFormatClass, 1419 keyClass, valueClass, 1420 keyConverter, valueConverter, jconf) 1421 1422 def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None): 1423 """ 1424 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1425 system, using the old Hadoop OutputFormat API (mapred package). Keys/values are 1426 converted for output using either user specified converters or, by default, 1427 L{org.apache.spark.api.python.JavaToWritableConverter}. 1428 1429 :param conf: Hadoop job configuration, passed in as a dict 1430 :param keyConverter: (None by default) 1431 :param valueConverter: (None by default) 1432 """ 1433 jconf = self.ctx._dictToJavaMap(conf) 1434 pickledRDD = self._pickled() 1435 self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf, 1436 keyConverter, valueConverter, False) 1437 1438 def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, 1439 keyConverter=None, valueConverter=None, conf=None, 1440 compressionCodecClass=None): 1441 """ 1442 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1443 system, using the old Hadoop OutputFormat API (mapred package). Key and value types 1444 will be inferred if not specified. Keys and values are converted for output using either 1445 user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The 1446 C{conf} is applied on top of the base Hadoop conf associated with the SparkContext 1447 of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. 1448 1449 :param path: path to Hadoop file 1450 :param outputFormatClass: fully qualified classname of Hadoop OutputFormat 1451 (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat") 1452 :param keyClass: fully qualified classname of key Writable class 1453 (e.g. "org.apache.hadoop.io.IntWritable", None by default) 1454 :param valueClass: fully qualified classname of value Writable class 1455 (e.g. "org.apache.hadoop.io.Text", None by default) 1456 :param keyConverter: (None by default) 1457 :param valueConverter: (None by default) 1458 :param conf: (None by default) 1459 :param compressionCodecClass: (None by default) 1460 """ 1461 jconf = self.ctx._dictToJavaMap(conf) 1462 pickledRDD = self._pickled() 1463 self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, True, path, 1464 outputFormatClass, 1465 keyClass, valueClass, 1466 keyConverter, valueConverter, 1467 jconf, compressionCodecClass) 1468 1469 def saveAsSequenceFile(self, path, compressionCodecClass=None): 1470 """ 1471 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1472 system, using the L{org.apache.hadoop.io.Writable} types that we convert from the 1473 RDD's key and value types. The mechanism is as follows: 1474 1475 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects. 1476 2. Keys and values of this Java RDD are converted to Writables and written out. 1477 1478 :param path: path to sequence file 1479 :param compressionCodecClass: (None by default) 1480 """ 1481 pickledRDD = self._pickled() 1482 self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, True, 1483 path, compressionCodecClass) 1484 1485 def saveAsPickleFile(self, path, batchSize=10): 1486 """ 1487 Save this RDD as a SequenceFile of serialized objects. The serializer 1488 used is L{pyspark.serializers.PickleSerializer}, default batch size 1489 is 10. 1490 1491 >>> tmpFile = NamedTemporaryFile(delete=True) 1492 >>> tmpFile.close() 1493 >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3) 1494 >>> sorted(sc.pickleFile(tmpFile.name, 5).map(str).collect()) 1495 ['1', '2', 'rdd', 'spark'] 1496 """ 1497 if batchSize == 0: 1498 ser = AutoBatchedSerializer(PickleSerializer()) 1499 else: 1500 ser = BatchedSerializer(PickleSerializer(), batchSize) 1501 self._reserialize(ser)._jrdd.saveAsObjectFile(path) 1502 1503 @ignore_unicode_prefix 1504 def saveAsTextFile(self, path, compressionCodecClass=None): 1505 """ 1506 Save this RDD as a text file, using string representations of elements. 1507 1508 @param path: path to text file 1509 @param compressionCodecClass: (None by default) string i.e. 1510 "org.apache.hadoop.io.compress.GzipCodec" 1511 1512 >>> tempFile = NamedTemporaryFile(delete=True) 1513 >>> tempFile.close() 1514 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 1515 >>> from fileinput import input 1516 >>> from glob import glob 1517 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 1518 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 1519 1520 Empty lines are tolerated when saving to text files. 1521 1522 >>> tempFile2 = NamedTemporaryFile(delete=True) 1523 >>> tempFile2.close() 1524 >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) 1525 >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) 1526 '\\n\\n\\nbar\\nfoo\\n' 1527 1528 Using compressionCodecClass 1529 1530 >>> tempFile3 = NamedTemporaryFile(delete=True) 1531 >>> tempFile3.close() 1532 >>> codec = "org.apache.hadoop.io.compress.GzipCodec" 1533 >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec) 1534 >>> from fileinput import input, hook_compressed 1535 >>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed)) 1536 >>> b''.join(result).decode('utf-8') 1537 u'bar\\nfoo\\n' 1538 """ 1539 def func(split, iterator): 1540 for x in iterator: 1541 if not isinstance(x, (unicode, bytes)): 1542 x = unicode(x) 1543 if isinstance(x, unicode): 1544 x = x.encode("utf-8") 1545 yield x 1546 keyed = self.mapPartitionsWithIndex(func) 1547 keyed._bypass_serializer = True 1548 if compressionCodecClass: 1549 compressionCodec = self.ctx._jvm.java.lang.Class.forName(compressionCodecClass) 1550 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec) 1551 else: 1552 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 1553 1554 # Pair functions 1555 1556 def collectAsMap(self): 1557 """ 1558 Return the key-value pairs in this RDD to the master as a dictionary. 1559 1560 .. note:: this method should only be used if the resulting data is expected 1561 to be small, as all the data is loaded into the driver's memory. 1562 1563 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 1564 >>> m[1] 1565 2 1566 >>> m[3] 1567 4 1568 """ 1569 return dict(self.collect()) 1570 1571 def keys(self): 1572 """ 1573 Return an RDD with the keys of each tuple. 1574 1575 >>> m = sc.parallelize([(1, 2), (3, 4)]).keys() 1576 >>> m.collect() 1577 [1, 3] 1578 """ 1579 return self.map(lambda x: x[0]) 1580 1581 def values(self): 1582 """ 1583 Return an RDD with the values of each tuple. 1584 1585 >>> m = sc.parallelize([(1, 2), (3, 4)]).values() 1586 >>> m.collect() 1587 [2, 4] 1588 """ 1589 return self.map(lambda x: x[1]) 1590 1591 def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash): 1592 """ 1593 Merge the values for each key using an associative and commutative reduce function. 1594 1595 This will also perform the merging locally on each mapper before 1596 sending results to a reducer, similarly to a "combiner" in MapReduce. 1597 1598 Output will be partitioned with C{numPartitions} partitions, or 1599 the default parallelism level if C{numPartitions} is not specified. 1600 Default partitioner is hash-partition. 1601 1602 >>> from operator import add 1603 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1604 >>> sorted(rdd.reduceByKey(add).collect()) 1605 [('a', 2), ('b', 1)] 1606 """ 1607 return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc) 1608 1609 def reduceByKeyLocally(self, func): 1610 """ 1611 Merge the values for each key using an associative and commutative reduce function, but 1612 return the results immediately to the master as a dictionary. 1613 1614 This will also perform the merging locally on each mapper before 1615 sending results to a reducer, similarly to a "combiner" in MapReduce. 1616 1617 >>> from operator import add 1618 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1619 >>> sorted(rdd.reduceByKeyLocally(add).items()) 1620 [('a', 2), ('b', 1)] 1621 """ 1622 def reducePartition(iterator): 1623 m = {} 1624 for k, v in iterator: 1625 m[k] = func(m[k], v) if k in m else v 1626 yield m 1627 1628 def mergeMaps(m1, m2): 1629 for k, v in m2.items(): 1630 m1[k] = func(m1[k], v) if k in m1 else v 1631 return m1 1632 return self.mapPartitions(reducePartition).reduce(mergeMaps) 1633 1634 def countByKey(self): 1635 """ 1636 Count the number of elements for each key, and return the result to the 1637 master as a dictionary. 1638 1639 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1640 >>> sorted(rdd.countByKey().items()) 1641 [('a', 2), ('b', 1)] 1642 """ 1643 return self.map(lambda x: x[0]).countByValue() 1644 1645 def join(self, other, numPartitions=None): 1646 """ 1647 Return an RDD containing all pairs of elements with matching keys in 1648 C{self} and C{other}. 1649 1650 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 1651 (k, v1) is in C{self} and (k, v2) is in C{other}. 1652 1653 Performs a hash join across the cluster. 1654 1655 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1656 >>> y = sc.parallelize([("a", 2), ("a", 3)]) 1657 >>> sorted(x.join(y).collect()) 1658 [('a', (1, 2)), ('a', (1, 3))] 1659 """ 1660 return python_join(self, other, numPartitions) 1661 1662 def leftOuterJoin(self, other, numPartitions=None): 1663 """ 1664 Perform a left outer join of C{self} and C{other}. 1665 1666 For each element (k, v) in C{self}, the resulting RDD will either 1667 contain all pairs (k, (v, w)) for w in C{other}, or the pair 1668 (k, (v, None)) if no elements in C{other} have key k. 1669 1670 Hash-partitions the resulting RDD into the given number of partitions. 1671 1672 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1673 >>> y = sc.parallelize([("a", 2)]) 1674 >>> sorted(x.leftOuterJoin(y).collect()) 1675 [('a', (1, 2)), ('b', (4, None))] 1676 """ 1677 return python_left_outer_join(self, other, numPartitions) 1678 1679 def rightOuterJoin(self, other, numPartitions=None): 1680 """ 1681 Perform a right outer join of C{self} and C{other}. 1682 1683 For each element (k, w) in C{other}, the resulting RDD will either 1684 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 1685 if no elements in C{self} have key k. 1686 1687 Hash-partitions the resulting RDD into the given number of partitions. 1688 1689 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1690 >>> y = sc.parallelize([("a", 2)]) 1691 >>> sorted(y.rightOuterJoin(x).collect()) 1692 [('a', (2, 1)), ('b', (None, 4))] 1693 """ 1694 return python_right_outer_join(self, other, numPartitions) 1695 1696 def fullOuterJoin(self, other, numPartitions=None): 1697 """ 1698 Perform a right outer join of C{self} and C{other}. 1699 1700 For each element (k, v) in C{self}, the resulting RDD will either 1701 contain all pairs (k, (v, w)) for w in C{other}, or the pair 1702 (k, (v, None)) if no elements in C{other} have key k. 1703 1704 Similarly, for each element (k, w) in C{other}, the resulting RDD will 1705 either contain all pairs (k, (v, w)) for v in C{self}, or the pair 1706 (k, (None, w)) if no elements in C{self} have key k. 1707 1708 Hash-partitions the resulting RDD into the given number of partitions. 1709 1710 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1711 >>> y = sc.parallelize([("a", 2), ("c", 8)]) 1712 >>> sorted(x.fullOuterJoin(y).collect()) 1713 [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))] 1714 """ 1715 return python_full_outer_join(self, other, numPartitions) 1716 1717 # TODO: add option to control map-side combining 1718 # portable_hash is used as default, because builtin hash of None is different 1719 # cross machines. 1720 def partitionBy(self, numPartitions, partitionFunc=portable_hash): 1721 """ 1722 Return a copy of the RDD partitioned using the specified partitioner. 1723 1724 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 1725 >>> sets = pairs.partitionBy(2).glom().collect() 1726 >>> len(set(sets[0]).intersection(set(sets[1]))) 1727 0 1728 """ 1729 if numPartitions is None: 1730 numPartitions = self._defaultReducePartitions() 1731 partitioner = Partitioner(numPartitions, partitionFunc) 1732 if self.partitioner == partitioner: 1733 return self 1734 1735 # Transferring O(n) objects to Java is too expensive. 1736 # Instead, we'll form the hash buckets in Python, 1737 # transferring O(numPartitions) objects to Java. 1738 # Each object is a (splitNumber, [objects]) pair. 1739 # In order to avoid too huge objects, the objects are 1740 # grouped into chunks. 1741 outputSerializer = self.ctx._unbatched_serializer 1742 1743 limit = (_parse_memory(self.ctx._conf.get( 1744 "spark.python.worker.memory", "512m")) / 2) 1745 1746 def add_shuffle_key(split, iterator): 1747 1748 buckets = defaultdict(list) 1749 c, batch = 0, min(10 * numPartitions, 1000) 1750 1751 for k, v in iterator: 1752 buckets[partitionFunc(k) % numPartitions].append((k, v)) 1753 c += 1 1754 1755 # check used memory and avg size of chunk of objects 1756 if (c % 1000 == 0 and get_used_memory() > limit 1757 or c > batch): 1758 n, size = len(buckets), 0 1759 for split in list(buckets.keys()): 1760 yield pack_long(split) 1761 d = outputSerializer.dumps(buckets[split]) 1762 del buckets[split] 1763 yield d 1764 size += len(d) 1765 1766 avg = int(size / n) >> 20 1767 # let 1M < avg < 10M 1768 if avg < 1: 1769 batch *= 1.5 1770 elif avg > 10: 1771 batch = max(int(batch / 1.5), 1) 1772 c = 0 1773 1774 for split, items in buckets.items(): 1775 yield pack_long(split) 1776 yield outputSerializer.dumps(items) 1777 1778 keyed = self.mapPartitionsWithIndex(add_shuffle_key, preservesPartitioning=True) 1779 keyed._bypass_serializer = True 1780 with SCCallSiteSync(self.context) as css: 1781 pairRDD = self.ctx._jvm.PairwiseRDD( 1782 keyed._jrdd.rdd()).asJavaPairRDD() 1783 jpartitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 1784 id(partitionFunc)) 1785 jrdd = self.ctx._jvm.PythonRDD.valueOfPair(pairRDD.partitionBy(jpartitioner)) 1786 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 1787 rdd.partitioner = partitioner 1788 return rdd 1789 1790 # TODO: add control over map-side aggregation 1791 def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 1792 numPartitions=None, partitionFunc=portable_hash): 1793 """ 1794 Generic function to combine the elements for each key using a custom 1795 set of aggregation functions. 1796 1797 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 1798 type" C. 1799 1800 Users provide three functions: 1801 1802 - C{createCombiner}, which turns a V into a C (e.g., creates 1803 a one-element list) 1804 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 1805 a list) 1806 - C{mergeCombiners}, to combine two C's into a single one. 1807 1808 In addition, users can control the partitioning of the output RDD. 1809 1810 .. note:: V and C can be different -- for example, one might group an RDD of type 1811 (Int, Int) into an RDD of type (Int, List[Int]). 1812 1813 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1814 >>> def add(a, b): return a + str(b) 1815 >>> sorted(x.combineByKey(str, add, add).collect()) 1816 [('a', '11'), ('b', '1')] 1817 """ 1818 if numPartitions is None: 1819 numPartitions = self._defaultReducePartitions() 1820 1821 serializer = self.ctx.serializer 1822 memory = self._memory_limit() 1823 agg = Aggregator(createCombiner, mergeValue, mergeCombiners) 1824 1825 def combineLocally(iterator): 1826 merger = ExternalMerger(agg, memory * 0.9, serializer) 1827 merger.mergeValues(iterator) 1828 return merger.items() 1829 1830 locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True) 1831 shuffled = locally_combined.partitionBy(numPartitions, partitionFunc) 1832 1833 def _mergeCombiners(iterator): 1834 merger = ExternalMerger(agg, memory, serializer) 1835 merger.mergeCombiners(iterator) 1836 return merger.items() 1837 1838 return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True) 1839 1840 def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None, 1841 partitionFunc=portable_hash): 1842 """ 1843 Aggregate the values of each key, using given combine functions and a neutral 1844 "zero value". This function can return a different result type, U, than the type 1845 of the values in this RDD, V. Thus, we need one operation for merging a V into 1846 a U and one operation for merging two U's, The former operation is used for merging 1847 values within a partition, and the latter is used for merging values between 1848 partitions. To avoid memory allocation, both of these functions are 1849 allowed to modify and return their first argument instead of creating a new U. 1850 """ 1851 def createZero(): 1852 return copy.deepcopy(zeroValue) 1853 1854 return self.combineByKey( 1855 lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions, partitionFunc) 1856 1857 def foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=portable_hash): 1858 """ 1859 Merge the values for each key using an associative function "func" 1860 and a neutral "zeroValue" which may be added to the result an 1861 arbitrary number of times, and must not change the result 1862 (e.g., 0 for addition, or 1 for multiplication.). 1863 1864 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1865 >>> from operator import add 1866 >>> sorted(rdd.foldByKey(0, add).collect()) 1867 [('a', 2), ('b', 1)] 1868 """ 1869 def createZero(): 1870 return copy.deepcopy(zeroValue) 1871 1872 return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions, 1873 partitionFunc) 1874 1875 def _memory_limit(self): 1876 return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m")) 1877 1878 # TODO: support variant with custom partitioner 1879 def groupByKey(self, numPartitions=None, partitionFunc=portable_hash): 1880 """ 1881 Group the values for each key in the RDD into a single sequence. 1882 Hash-partitions the resulting RDD with numPartitions partitions. 1883 1884 .. note:: If you are grouping in order to perform an aggregation (such as a 1885 sum or average) over each key, using reduceByKey or aggregateByKey will 1886 provide much better performance. 1887 1888 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1889 >>> sorted(rdd.groupByKey().mapValues(len).collect()) 1890 [('a', 2), ('b', 1)] 1891 >>> sorted(rdd.groupByKey().mapValues(list).collect()) 1892 [('a', [1, 1]), ('b', [1])] 1893 """ 1894 def createCombiner(x): 1895 return [x] 1896 1897 def mergeValue(xs, x): 1898 xs.append(x) 1899 return xs 1900 1901 def mergeCombiners(a, b): 1902 a.extend(b) 1903 return a 1904 1905 memory = self._memory_limit() 1906 serializer = self._jrdd_deserializer 1907 agg = Aggregator(createCombiner, mergeValue, mergeCombiners) 1908 1909 def combine(iterator): 1910 merger = ExternalMerger(agg, memory * 0.9, serializer) 1911 merger.mergeValues(iterator) 1912 return merger.items() 1913 1914 locally_combined = self.mapPartitions(combine, preservesPartitioning=True) 1915 shuffled = locally_combined.partitionBy(numPartitions, partitionFunc) 1916 1917 def groupByKey(it): 1918 merger = ExternalGroupBy(agg, memory, serializer) 1919 merger.mergeCombiners(it) 1920 return merger.items() 1921 1922 return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable) 1923 1924 def flatMapValues(self, f): 1925 """ 1926 Pass each value in the key-value pair RDD through a flatMap function 1927 without changing the keys; this also retains the original RDD's 1928 partitioning. 1929 1930 >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) 1931 >>> def f(x): return x 1932 >>> x.flatMapValues(f).collect() 1933 [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] 1934 """ 1935 flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1])) 1936 return self.flatMap(flat_map_fn, preservesPartitioning=True) 1937 1938 def mapValues(self, f): 1939 """ 1940 Pass each value in the key-value pair RDD through a map function 1941 without changing the keys; this also retains the original RDD's 1942 partitioning. 1943 1944 >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) 1945 >>> def f(x): return len(x) 1946 >>> x.mapValues(f).collect() 1947 [('a', 3), ('b', 1)] 1948 """ 1949 map_values_fn = lambda kv: (kv[0], f(kv[1])) 1950 return self.map(map_values_fn, preservesPartitioning=True) 1951 1952 def groupWith(self, other, *others): 1953 """ 1954 Alias for cogroup but with support for multiple RDDs. 1955 1956 >>> w = sc.parallelize([("a", 5), ("b", 6)]) 1957 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1958 >>> y = sc.parallelize([("a", 2)]) 1959 >>> z = sc.parallelize([("b", 42)]) 1960 >>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))] 1961 [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))] 1962 1963 """ 1964 return python_cogroup((self, other) + others, numPartitions=None) 1965 1966 # TODO: add variant with custom parittioner 1967 def cogroup(self, other, numPartitions=None): 1968 """ 1969 For each key k in C{self} or C{other}, return a resulting RDD that 1970 contains a tuple with the list of values for that key in C{self} as 1971 well as C{other}. 1972 1973 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1974 >>> y = sc.parallelize([("a", 2)]) 1975 >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))] 1976 [('a', ([1], [2])), ('b', ([4], []))] 1977 """ 1978 return python_cogroup((self, other), numPartitions) 1979 1980 def sampleByKey(self, withReplacement, fractions, seed=None): 1981 """ 1982 Return a subset of this RDD sampled by key (via stratified sampling). 1983 Create a sample of this RDD using variable sampling rates for 1984 different keys as specified by fractions, a key to sampling rate map. 1985 1986 >>> fractions = {"a": 0.2, "b": 0.1} 1987 >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000))) 1988 >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect()) 1989 >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150 1990 True 1991 >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0 1992 True 1993 >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0 1994 True 1995 """ 1996 for fraction in fractions.values(): 1997 assert fraction >= 0.0, "Negative fraction value: %s" % fraction 1998 return self.mapPartitionsWithIndex( 1999 RDDStratifiedSampler(withReplacement, fractions, seed).func, True) 2000 2001 def subtractByKey(self, other, numPartitions=None): 2002 """ 2003 Return each (key, value) pair in C{self} that has no pair with matching 2004 key in C{other}. 2005 2006 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 2007 >>> y = sc.parallelize([("a", 3), ("c", None)]) 2008 >>> sorted(x.subtractByKey(y).collect()) 2009 [('b', 4), ('b', 5)] 2010 """ 2011 def filter_func(pair): 2012 key, (val1, val2) = pair 2013 return val1 and not val2 2014 return self.cogroup(other, numPartitions).filter(filter_func).flatMapValues(lambda x: x[0]) 2015 2016 def subtract(self, other, numPartitions=None): 2017 """ 2018 Return each value in C{self} that is not contained in C{other}. 2019 2020 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 2021 >>> y = sc.parallelize([("a", 3), ("c", None)]) 2022 >>> sorted(x.subtract(y).collect()) 2023 [('a', 1), ('b', 4), ('b', 5)] 2024 """ 2025 # note: here 'True' is just a placeholder 2026 rdd = other.map(lambda x: (x, True)) 2027 return self.map(lambda x: (x, True)).subtractByKey(rdd, numPartitions).keys() 2028 2029 def keyBy(self, f): 2030 """ 2031 Creates tuples of the elements in this RDD by applying C{f}. 2032 2033 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 2034 >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 2035 >>> [(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())] 2036 [(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])] 2037 """ 2038 return self.map(lambda x: (f(x), x)) 2039 2040 def repartition(self, numPartitions): 2041 """ 2042 Return a new RDD that has exactly numPartitions partitions. 2043 2044 Can increase or decrease the level of parallelism in this RDD. 2045 Internally, this uses a shuffle to redistribute data. 2046 If you are decreasing the number of partitions in this RDD, consider 2047 using `coalesce`, which can avoid performing a shuffle. 2048 2049 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 2050 >>> sorted(rdd.glom().collect()) 2051 [[1], [2, 3], [4, 5], [6, 7]] 2052 >>> len(rdd.repartition(2).glom().collect()) 2053 2 2054 >>> len(rdd.repartition(10).glom().collect()) 2055 10 2056 """ 2057 return self.coalesce(numPartitions, shuffle=True) 2058 2059 def coalesce(self, numPartitions, shuffle=False): 2060 """ 2061 Return a new RDD that is reduced into `numPartitions` partitions. 2062 2063 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 2064 [[1], [2, 3], [4, 5]] 2065 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 2066 [[1, 2, 3, 4, 5]] 2067 """ 2068 if shuffle: 2069 # Decrease the batch size in order to distribute evenly the elements across output 2070 # partitions. Otherwise, repartition will possibly produce highly skewed partitions. 2071 batchSize = min(10, self.ctx._batchSize or 1024) 2072 ser = BatchedSerializer(PickleSerializer(), batchSize) 2073 selfCopy = self._reserialize(ser) 2074 jrdd_deserializer = selfCopy._jrdd_deserializer 2075 jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle) 2076 else: 2077 jrdd_deserializer = self._jrdd_deserializer 2078 jrdd = self._jrdd.coalesce(numPartitions, shuffle) 2079 return RDD(jrdd, self.ctx, jrdd_deserializer) 2080 2081 def zip(self, other): 2082 """ 2083 Zips this RDD with another one, returning key-value pairs with the 2084 first element in each RDD second element in each RDD, etc. Assumes 2085 that the two RDDs have the same number of partitions and the same 2086 number of elements in each partition (e.g. one was made through 2087 a map on the other). 2088 2089 >>> x = sc.parallelize(range(0,5)) 2090 >>> y = sc.parallelize(range(1000, 1005)) 2091 >>> x.zip(y).collect() 2092 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 2093 """ 2094 def get_batch_size(ser): 2095 if isinstance(ser, BatchedSerializer): 2096 return ser.batchSize 2097 return 1 # not batched 2098 2099 def batch_as(rdd, batchSize): 2100 return rdd._reserialize(BatchedSerializer(PickleSerializer(), batchSize)) 2101 2102 my_batch = get_batch_size(self._jrdd_deserializer) 2103 other_batch = get_batch_size(other._jrdd_deserializer) 2104 if my_batch != other_batch or not my_batch: 2105 # use the smallest batchSize for both of them 2106 batchSize = min(my_batch, other_batch) 2107 if batchSize <= 0: 2108 # auto batched or unlimited 2109 batchSize = 100 2110 other = batch_as(other, batchSize) 2111 self = batch_as(self, batchSize) 2112 2113 if self.getNumPartitions() != other.getNumPartitions(): 2114 raise ValueError("Can only zip with RDD which has the same number of partitions") 2115 2116 # There will be an Exception in JVM if there are different number 2117 # of items in each partitions. 2118 pairRDD = self._jrdd.zip(other._jrdd) 2119 deserializer = PairDeserializer(self._jrdd_deserializer, 2120 other._jrdd_deserializer) 2121 return RDD(pairRDD, self.ctx, deserializer) 2122 2123 def zipWithIndex(self): 2124 """ 2125 Zips this RDD with its element indices. 2126 2127 The ordering is first based on the partition index and then the 2128 ordering of items within each partition. So the first item in 2129 the first partition gets index 0, and the last item in the last 2130 partition receives the largest index. 2131 2132 This method needs to trigger a spark job when this RDD contains 2133 more than one partitions. 2134 2135 >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect() 2136 [('a', 0), ('b', 1), ('c', 2), ('d', 3)] 2137 """ 2138 starts = [0] 2139 if self.getNumPartitions() > 1: 2140 nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect() 2141 for i in range(len(nums) - 1): 2142 starts.append(starts[-1] + nums[i]) 2143 2144 def func(k, it): 2145 for i, v in enumerate(it, starts[k]): 2146 yield v, i 2147 2148 return self.mapPartitionsWithIndex(func) 2149 2150 def zipWithUniqueId(self): 2151 """ 2152 Zips this RDD with generated unique Long ids. 2153 2154 Items in the kth partition will get ids k, n+k, 2*n+k, ..., where 2155 n is the number of partitions. So there may exist gaps, but this 2156 method won't trigger a spark job, which is different from 2157 L{zipWithIndex} 2158 2159 >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect() 2160 [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)] 2161 """ 2162 n = self.getNumPartitions() 2163 2164 def func(k, it): 2165 for i, v in enumerate(it): 2166 yield v, i * n + k 2167 2168 return self.mapPartitionsWithIndex(func) 2169 2170 def name(self): 2171 """ 2172 Return the name of this RDD. 2173 """ 2174 n = self._jrdd.name() 2175 if n: 2176 return n 2177 2178 @ignore_unicode_prefix 2179 def setName(self, name): 2180 """ 2181 Assign a name to this RDD. 2182 2183 >>> rdd1 = sc.parallelize([1, 2]) 2184 >>> rdd1.setName('RDD1').name() 2185 u'RDD1' 2186 """ 2187 self._jrdd.setName(name) 2188 return self 2189 2190 def toDebugString(self): 2191 """ 2192 A description of this RDD and its recursive dependencies for debugging. 2193 """ 2194 debug_string = self._jrdd.toDebugString() 2195 if debug_string: 2196 return debug_string.encode('utf-8') 2197 2198 def getStorageLevel(self): 2199 """ 2200 Get the RDD's current storage level. 2201 2202 >>> rdd1 = sc.parallelize([1,2]) 2203 >>> rdd1.getStorageLevel() 2204 StorageLevel(False, False, False, False, 1) 2205 >>> print(rdd1.getStorageLevel()) 2206 Serialized 1x Replicated 2207 """ 2208 java_storage_level = self._jrdd.getStorageLevel() 2209 storage_level = StorageLevel(java_storage_level.useDisk(), 2210 java_storage_level.useMemory(), 2211 java_storage_level.useOffHeap(), 2212 java_storage_level.deserialized(), 2213 java_storage_level.replication()) 2214 return storage_level 2215 2216 def _defaultReducePartitions(self): 2217 """ 2218 Returns the default number of partitions to use during reduce tasks (e.g., groupBy). 2219 If spark.default.parallelism is set, then we'll use the value from SparkContext 2220 defaultParallelism, otherwise we'll use the number of partitions in this RDD. 2221 2222 This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce 2223 the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will 2224 be inherent. 2225 """ 2226 if self.ctx._conf.contains("spark.default.parallelism"): 2227 return self.ctx.defaultParallelism 2228 else: 2229 return self.getNumPartitions() 2230 2231 def lookup(self, key): 2232 """ 2233 Return the list of values in the RDD for key `key`. This operation 2234 is done efficiently if the RDD has a known partitioner by only 2235 searching the partition that the key maps to. 2236 2237 >>> l = range(1000) 2238 >>> rdd = sc.parallelize(zip(l, l), 10) 2239 >>> rdd.lookup(42) # slow 2240 [42] 2241 >>> sorted = rdd.sortByKey() 2242 >>> sorted.lookup(42) # fast 2243 [42] 2244 >>> sorted.lookup(1024) 2245 [] 2246 >>> rdd2 = sc.parallelize([(('a', 'b'), 'c')]).groupByKey() 2247 >>> list(rdd2.lookup(('a', 'b'))[0]) 2248 ['c'] 2249 """ 2250 values = self.filter(lambda kv: kv[0] == key).values() 2251 2252 if self.partitioner is not None: 2253 return self.ctx.runJob(values, lambda x: x, [self.partitioner(key)]) 2254 2255 return values.collect() 2256 2257 def _to_java_object_rdd(self): 2258 """ Return a JavaRDD of Object by unpickling 2259 2260 It will convert each Python object into Java object by Pyrolite, whenever the 2261 RDD is serialized in batch or not. 2262 """ 2263 rdd = self._pickled() 2264 return self.ctx._jvm.SerDeUtil.pythonToJava(rdd._jrdd, True) 2265 2266 def countApprox(self, timeout, confidence=0.95): 2267 """ 2268 .. note:: Experimental 2269 2270 Approximate version of count() that returns a potentially incomplete 2271 result within a timeout, even if not all tasks have finished. 2272 2273 >>> rdd = sc.parallelize(range(1000), 10) 2274 >>> rdd.countApprox(1000, 1.0) 2275 1000 2276 """ 2277 drdd = self.mapPartitions(lambda it: [float(sum(1 for i in it))]) 2278 return int(drdd.sumApprox(timeout, confidence)) 2279 2280 def sumApprox(self, timeout, confidence=0.95): 2281 """ 2282 .. note:: Experimental 2283 2284 Approximate operation to return the sum within a timeout 2285 or meet the confidence. 2286 2287 >>> rdd = sc.parallelize(range(1000), 10) 2288 >>> r = sum(range(1000)) 2289 >>> abs(rdd.sumApprox(1000) - r) / r < 0.05 2290 True 2291 """ 2292 jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_java_object_rdd() 2293 jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) 2294 r = jdrdd.sumApprox(timeout, confidence).getFinalValue() 2295 return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) 2296 2297 def meanApprox(self, timeout, confidence=0.95): 2298 """ 2299 .. note:: Experimental 2300 2301 Approximate operation to return the mean within a timeout 2302 or meet the confidence. 2303 2304 >>> rdd = sc.parallelize(range(1000), 10) 2305 >>> r = sum(range(1000)) / 1000.0 2306 >>> abs(rdd.meanApprox(1000) - r) / r < 0.05 2307 True 2308 """ 2309 jrdd = self.map(float)._to_java_object_rdd() 2310 jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) 2311 r = jdrdd.meanApprox(timeout, confidence).getFinalValue() 2312 return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) 2313 2314 def countApproxDistinct(self, relativeSD=0.05): 2315 """ 2316 .. note:: Experimental 2317 2318 Return approximate number of distinct elements in the RDD. 2319 2320 The algorithm used is based on streamlib's implementation of 2321 `"HyperLogLog in Practice: Algorithmic Engineering of a State 2322 of The Art Cardinality Estimation Algorithm", available here 2323 <http://dx.doi.org/10.1145/2452376.2452456>`_. 2324 2325 :param relativeSD: Relative accuracy. Smaller values create 2326 counters that require more space. 2327 It must be greater than 0.000017. 2328 2329 >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct() 2330 >>> 900 < n < 1100 2331 True 2332 >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct() 2333 >>> 16 < n < 24 2334 True 2335 """ 2336 if relativeSD < 0.000017: 2337 raise ValueError("relativeSD should be greater than 0.000017") 2338 # the hash space in Java is 2^32 2339 hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF) 2340 return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD) 2341 2342 def toLocalIterator(self): 2343 """ 2344 Return an iterator that contains all of the elements in this RDD. 2345 The iterator will consume as much memory as the largest partition in this RDD. 2346 2347 >>> rdd = sc.parallelize(range(10)) 2348 >>> [x for x in rdd.toLocalIterator()] 2349 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 2350 """ 2351 with SCCallSiteSync(self.context) as css: 2352 port = self.ctx._jvm.PythonRDD.toLocalIteratorAndServe(self._jrdd.rdd()) 2353 return _load_from_socket(port, self._jrdd_deserializer) 2354 2355 2356def _prepare_for_python_RDD(sc, command): 2357 # the serialized command will be compressed by broadcast 2358 ser = CloudPickleSerializer() 2359 pickled_command = ser.dumps(command) 2360 if len(pickled_command) > (1 << 20): # 1M 2361 # The broadcast will have same life cycle as created PythonRDD 2362 broadcast = sc.broadcast(pickled_command) 2363 pickled_command = ser.dumps(broadcast) 2364 broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars] 2365 sc._pickled_broadcast_vars.clear() 2366 return pickled_command, broadcast_vars, sc.environment, sc._python_includes 2367 2368 2369def _wrap_function(sc, func, deserializer, serializer, profiler=None): 2370 assert deserializer, "deserializer should not be empty" 2371 assert serializer, "serializer should not be empty" 2372 command = (func, profiler, deserializer, serializer) 2373 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) 2374 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec, 2375 sc.pythonVer, broadcast_vars, sc._javaAccumulator) 2376 2377 2378class PipelinedRDD(RDD): 2379 2380 """ 2381 Pipelined maps: 2382 2383 >>> rdd = sc.parallelize([1, 2, 3, 4]) 2384 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 2385 [4, 8, 12, 16] 2386 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 2387 [4, 8, 12, 16] 2388 2389 Pipelined reduces: 2390 >>> from operator import add 2391 >>> rdd.map(lambda x: 2 * x).reduce(add) 2392 20 2393 >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 2394 20 2395 """ 2396 2397 def __init__(self, prev, func, preservesPartitioning=False): 2398 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 2399 # This transformation is the first in its stage: 2400 self.func = func 2401 self.preservesPartitioning = preservesPartitioning 2402 self._prev_jrdd = prev._jrdd 2403 self._prev_jrdd_deserializer = prev._jrdd_deserializer 2404 else: 2405 prev_func = prev.func 2406 2407 def pipeline_func(split, iterator): 2408 return func(split, prev_func(split, iterator)) 2409 self.func = pipeline_func 2410 self.preservesPartitioning = \ 2411 prev.preservesPartitioning and preservesPartitioning 2412 self._prev_jrdd = prev._prev_jrdd # maintain the pipeline 2413 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 2414 self.is_cached = False 2415 self.is_checkpointed = False 2416 self.ctx = prev.ctx 2417 self.prev = prev 2418 self._jrdd_val = None 2419 self._id = None 2420 self._jrdd_deserializer = self.ctx.serializer 2421 self._bypass_serializer = False 2422 self.partitioner = prev.partitioner if self.preservesPartitioning else None 2423 2424 def getNumPartitions(self): 2425 return self._prev_jrdd.partitions().size() 2426 2427 @property 2428 def _jrdd(self): 2429 if self._jrdd_val: 2430 return self._jrdd_val 2431 if self._bypass_serializer: 2432 self._jrdd_deserializer = NoOpSerializer() 2433 2434 if self.ctx.profiler_collector: 2435 profiler = self.ctx.profiler_collector.new_profiler(self.ctx) 2436 else: 2437 profiler = None 2438 2439 wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer, 2440 self._jrdd_deserializer, profiler) 2441 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func, 2442 self.preservesPartitioning) 2443 self._jrdd_val = python_rdd.asJavaRDD() 2444 2445 if profiler: 2446 self._id = self._jrdd_val.id() 2447 self.ctx.profiler_collector.add_profiler(self._id, profiler) 2448 return self._jrdd_val 2449 2450 def id(self): 2451 if self._id is None: 2452 self._id = self._jrdd.id() 2453 return self._id 2454 2455 def _is_pipelinable(self): 2456 return not (self.is_cached or self.is_checkpointed) 2457 2458 2459def _test(): 2460 import doctest 2461 from pyspark.context import SparkContext 2462 globs = globals().copy() 2463 # The small batch size here ensures that we see multiple batches, 2464 # even in these small test examples: 2465 globs['sc'] = SparkContext('local[4]', 'PythonTest') 2466 (failure_count, test_count) = doctest.testmod( 2467 globs=globs, optionflags=doctest.ELLIPSIS) 2468 globs['sc'].stop() 2469 if failure_count: 2470 exit(-1) 2471 2472 2473if __name__ == "__main__": 2474 _test() 2475