1#
2# Licensed to the Apache Software Foundation (ASF) under one or more
3# contributor license agreements.  See the NOTICE file distributed with
4# this work for additional information regarding copyright ownership.
5# The ASF licenses this file to You under the Apache License, Version 2.0
6# (the "License"); you may not use this file except in compliance with
7# the License.  You may obtain a copy of the License at
8#
9#    http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import copy
19import sys
20import os
21import re
22import operator
23import shlex
24import warnings
25import heapq
26import bisect
27import random
28import socket
29from subprocess import Popen, PIPE
30from tempfile import NamedTemporaryFile
31from threading import Thread
32from collections import defaultdict
33from itertools import chain
34from functools import reduce
35from math import sqrt, log, isinf, isnan, pow, ceil
36
37if sys.version > '3':
38    basestring = unicode = str
39else:
40    from itertools import imap as map, ifilter as filter
41
42from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
43    BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
44    PickleSerializer, pack_long, AutoBatchedSerializer
45from pyspark.join import python_join, python_left_outer_join, \
46    python_right_outer_join, python_full_outer_join, python_cogroup
47from pyspark.statcounter import StatCounter
48from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler
49from pyspark.storagelevel import StorageLevel
50from pyspark.resultiterable import ResultIterable
51from pyspark.shuffle import Aggregator, ExternalMerger, \
52    get_used_memory, ExternalSorter, ExternalGroupBy
53from pyspark.traceback_utils import SCCallSiteSync
54
55
56__all__ = ["RDD"]
57
58
59def portable_hash(x):
60    """
61    This function returns consistent hash code for builtin types, especially
62    for None and tuple with None.
63
64    The algorithm is similar to that one used by CPython 2.7
65
66    >>> portable_hash(None)
67    0
68    >>> portable_hash((None, 1)) & 0xffffffff
69    219750521
70    """
71    if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ:
72        raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED")
73
74    if x is None:
75        return 0
76    if isinstance(x, tuple):
77        h = 0x345678
78        for i in x:
79            h ^= portable_hash(i)
80            h *= 1000003
81            h &= sys.maxsize
82        h ^= len(x)
83        if h == -1:
84            h = -2
85        return int(h)
86    return hash(x)
87
88
89class BoundedFloat(float):
90    """
91    Bounded value is generated by approximate job, with confidence and low
92    bound and high bound.
93
94    >>> BoundedFloat(100.0, 0.95, 95.0, 105.0)
95    100.0
96    """
97    def __new__(cls, mean, confidence, low, high):
98        obj = float.__new__(cls, mean)
99        obj.confidence = confidence
100        obj.low = low
101        obj.high = high
102        return obj
103
104
105def _parse_memory(s):
106    """
107    Parse a memory string in the format supported by Java (e.g. 1g, 200m) and
108    return the value in MB
109
110    >>> _parse_memory("256m")
111    256
112    >>> _parse_memory("2g")
113    2048
114    """
115    units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024}
116    if s[-1].lower() not in units:
117        raise ValueError("invalid format: " + s)
118    return int(float(s[:-1]) * units[s[-1].lower()])
119
120
121def _load_from_socket(port, serializer):
122    sock = None
123    # Support for both IPv4 and IPv6.
124    # On most of IPv6-ready systems, IPv6 will take precedence.
125    for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
126        af, socktype, proto, canonname, sa = res
127        sock = socket.socket(af, socktype, proto)
128        try:
129            sock.settimeout(3)
130            sock.connect(sa)
131        except socket.error:
132            sock.close()
133            sock = None
134            continue
135        break
136    if not sock:
137        raise Exception("could not open socket")
138    # The RDD materialization time is unpredicable, if we set a timeout for socket reading
139    # operation, it will very possibly fail. See SPARK-18281.
140    sock.settimeout(None)
141    # The socket will be automatically closed when garbage-collected.
142    return serializer.load_stream(sock.makefile("rb", 65536))
143
144
145def ignore_unicode_prefix(f):
146    """
147    Ignore the 'u' prefix of string in doc tests, to make it works
148    in both python 2 and 3
149    """
150    if sys.version >= '3':
151        # the representation of unicode string in Python 3 does not have prefix 'u',
152        # so remove the prefix 'u' for doc tests
153        literal_re = re.compile(r"(\W|^)[uU](['])", re.UNICODE)
154        f.__doc__ = literal_re.sub(r'\1\2', f.__doc__)
155    return f
156
157
158class Partitioner(object):
159    def __init__(self, numPartitions, partitionFunc):
160        self.numPartitions = numPartitions
161        self.partitionFunc = partitionFunc
162
163    def __eq__(self, other):
164        return (isinstance(other, Partitioner) and self.numPartitions == other.numPartitions
165                and self.partitionFunc == other.partitionFunc)
166
167    def __call__(self, k):
168        return self.partitionFunc(k) % self.numPartitions
169
170
171class RDD(object):
172
173    """
174    A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
175    Represents an immutable, partitioned collection of elements that can be
176    operated on in parallel.
177    """
178
179    def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer())):
180        self._jrdd = jrdd
181        self.is_cached = False
182        self.is_checkpointed = False
183        self.ctx = ctx
184        self._jrdd_deserializer = jrdd_deserializer
185        self._id = jrdd.id()
186        self.partitioner = None
187
188    def _pickled(self):
189        return self._reserialize(AutoBatchedSerializer(PickleSerializer()))
190
191    def id(self):
192        """
193        A unique ID for this RDD (within its SparkContext).
194        """
195        return self._id
196
197    def __repr__(self):
198        return self._jrdd.toString()
199
200    def __getnewargs__(self):
201        # This method is called when attempting to pickle an RDD, which is always an error:
202        raise Exception(
203            "It appears that you are attempting to broadcast an RDD or reference an RDD from an "
204            "action or transformation. RDD transformations and actions can only be invoked by the "
205            "driver, not inside of other transformations; for example, "
206            "rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values "
207            "transformation and count action cannot be performed inside of the rdd1.map "
208            "transformation. For more information, see SPARK-5063."
209        )
210
211    @property
212    def context(self):
213        """
214        The L{SparkContext} that this RDD was created on.
215        """
216        return self.ctx
217
218    def cache(self):
219        """
220        Persist this RDD with the default storage level (C{MEMORY_ONLY}).
221        """
222        self.is_cached = True
223        self.persist(StorageLevel.MEMORY_ONLY)
224        return self
225
226    def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
227        """
228        Set this RDD's storage level to persist its values across operations
229        after the first time it is computed. This can only be used to assign
230        a new storage level if the RDD does not have a storage level set yet.
231        If no storage level is specified defaults to (C{MEMORY_ONLY}).
232
233        >>> rdd = sc.parallelize(["b", "a", "c"])
234        >>> rdd.persist().is_cached
235        True
236        """
237        self.is_cached = True
238        javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
239        self._jrdd.persist(javaStorageLevel)
240        return self
241
242    def unpersist(self):
243        """
244        Mark the RDD as non-persistent, and remove all blocks for it from
245        memory and disk.
246        """
247        self.is_cached = False
248        self._jrdd.unpersist()
249        return self
250
251    def checkpoint(self):
252        """
253        Mark this RDD for checkpointing. It will be saved to a file inside the
254        checkpoint directory set with L{SparkContext.setCheckpointDir()} and
255        all references to its parent RDDs will be removed. This function must
256        be called before any job has been executed on this RDD. It is strongly
257        recommended that this RDD is persisted in memory, otherwise saving it
258        on a file will require recomputation.
259        """
260        self.is_checkpointed = True
261        self._jrdd.rdd().checkpoint()
262
263    def isCheckpointed(self):
264        """
265        Return whether this RDD is checkpointed and materialized, either reliably or locally.
266        """
267        return self._jrdd.rdd().isCheckpointed()
268
269    def localCheckpoint(self):
270        """
271        Mark this RDD for local checkpointing using Spark's existing caching layer.
272
273        This method is for users who wish to truncate RDD lineages while skipping the expensive
274        step of replicating the materialized data in a reliable distributed file system. This is
275        useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX).
276
277        Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed
278        data is written to ephemeral local storage in the executors instead of to a reliable,
279        fault-tolerant storage. The effect is that if an executor fails during the computation,
280        the checkpointed data may no longer be accessible, causing an irrecoverable job failure.
281
282        This is NOT safe to use with dynamic allocation, which removes executors along
283        with their cached blocks. If you must use both features, you are advised to set
284        L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value.
285
286        The checkpoint directory set through L{SparkContext.setCheckpointDir()} is not used.
287        """
288        self._jrdd.rdd().localCheckpoint()
289
290    def isLocallyCheckpointed(self):
291        """
292        Return whether this RDD is marked for local checkpointing.
293
294        Exposed for testing.
295        """
296        return self._jrdd.rdd().isLocallyCheckpointed()
297
298    def getCheckpointFile(self):
299        """
300        Gets the name of the file to which this RDD was checkpointed
301
302        Not defined if RDD is checkpointed locally.
303        """
304        checkpointFile = self._jrdd.rdd().getCheckpointFile()
305        if checkpointFile.isDefined():
306            return checkpointFile.get()
307
308    def map(self, f, preservesPartitioning=False):
309        """
310        Return a new RDD by applying a function to each element of this RDD.
311
312        >>> rdd = sc.parallelize(["b", "a", "c"])
313        >>> sorted(rdd.map(lambda x: (x, 1)).collect())
314        [('a', 1), ('b', 1), ('c', 1)]
315        """
316        def func(_, iterator):
317            return map(f, iterator)
318        return self.mapPartitionsWithIndex(func, preservesPartitioning)
319
320    def flatMap(self, f, preservesPartitioning=False):
321        """
322        Return a new RDD by first applying a function to all elements of this
323        RDD, and then flattening the results.
324
325        >>> rdd = sc.parallelize([2, 3, 4])
326        >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
327        [1, 1, 1, 2, 2, 3]
328        >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
329        [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
330        """
331        def func(s, iterator):
332            return chain.from_iterable(map(f, iterator))
333        return self.mapPartitionsWithIndex(func, preservesPartitioning)
334
335    def mapPartitions(self, f, preservesPartitioning=False):
336        """
337        Return a new RDD by applying a function to each partition of this RDD.
338
339        >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
340        >>> def f(iterator): yield sum(iterator)
341        >>> rdd.mapPartitions(f).collect()
342        [3, 7]
343        """
344        def func(s, iterator):
345            return f(iterator)
346        return self.mapPartitionsWithIndex(func, preservesPartitioning)
347
348    def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
349        """
350        Return a new RDD by applying a function to each partition of this RDD,
351        while tracking the index of the original partition.
352
353        >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
354        >>> def f(splitIndex, iterator): yield splitIndex
355        >>> rdd.mapPartitionsWithIndex(f).sum()
356        6
357        """
358        return PipelinedRDD(self, f, preservesPartitioning)
359
360    def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
361        """
362        Deprecated: use mapPartitionsWithIndex instead.
363
364        Return a new RDD by applying a function to each partition of this RDD,
365        while tracking the index of the original partition.
366
367        >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
368        >>> def f(splitIndex, iterator): yield splitIndex
369        >>> rdd.mapPartitionsWithSplit(f).sum()
370        6
371        """
372        warnings.warn("mapPartitionsWithSplit is deprecated; "
373                      "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
374        return self.mapPartitionsWithIndex(f, preservesPartitioning)
375
376    def getNumPartitions(self):
377        """
378        Returns the number of partitions in RDD
379
380        >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
381        >>> rdd.getNumPartitions()
382        2
383        """
384        return self._jrdd.partitions().size()
385
386    def filter(self, f):
387        """
388        Return a new RDD containing only the elements that satisfy a predicate.
389
390        >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
391        >>> rdd.filter(lambda x: x % 2 == 0).collect()
392        [2, 4]
393        """
394        def func(iterator):
395            return filter(f, iterator)
396        return self.mapPartitions(func, True)
397
398    def distinct(self, numPartitions=None):
399        """
400        Return a new RDD containing the distinct elements in this RDD.
401
402        >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
403        [1, 2, 3]
404        """
405        return self.map(lambda x: (x, None)) \
406                   .reduceByKey(lambda x, _: x, numPartitions) \
407                   .map(lambda x: x[0])
408
409    def sample(self, withReplacement, fraction, seed=None):
410        """
411        Return a sampled subset of this RDD.
412
413        :param withReplacement: can elements be sampled multiple times (replaced when sampled out)
414        :param fraction: expected size of the sample as a fraction of this RDD's size
415            without replacement: probability that each element is chosen; fraction must be [0, 1]
416            with replacement: expected number of times each element is chosen; fraction must be >= 0
417        :param seed: seed for the random number generator
418
419        .. note:: This is not guaranteed to provide exactly the fraction specified of the total
420            count of the given :class:`DataFrame`.
421
422        >>> rdd = sc.parallelize(range(100), 4)
423        >>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14
424        True
425        """
426        assert fraction >= 0.0, "Negative fraction value: %s" % fraction
427        return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
428
429    def randomSplit(self, weights, seed=None):
430        """
431        Randomly splits this RDD with the provided weights.
432
433        :param weights: weights for splits, will be normalized if they don't sum to 1
434        :param seed: random seed
435        :return: split RDDs in a list
436
437        >>> rdd = sc.parallelize(range(500), 1)
438        >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17)
439        >>> len(rdd1.collect() + rdd2.collect())
440        500
441        >>> 150 < rdd1.count() < 250
442        True
443        >>> 250 < rdd2.count() < 350
444        True
445        """
446        s = float(sum(weights))
447        cweights = [0.0]
448        for w in weights:
449            cweights.append(cweights[-1] + w / s)
450        if seed is None:
451            seed = random.randint(0, 2 ** 32 - 1)
452        return [self.mapPartitionsWithIndex(RDDRangeSampler(lb, ub, seed).func, True)
453                for lb, ub in zip(cweights, cweights[1:])]
454
455    # this is ported from scala/spark/RDD.scala
456    def takeSample(self, withReplacement, num, seed=None):
457        """
458        Return a fixed-size sampled subset of this RDD.
459
460        .. note:: This method should only be used if the resulting array is expected
461            to be small, as all the data is loaded into the driver's memory.
462
463        >>> rdd = sc.parallelize(range(0, 10))
464        >>> len(rdd.takeSample(True, 20, 1))
465        20
466        >>> len(rdd.takeSample(False, 5, 2))
467        5
468        >>> len(rdd.takeSample(False, 15, 3))
469        10
470        """
471        numStDev = 10.0
472
473        if num < 0:
474            raise ValueError("Sample size cannot be negative.")
475        elif num == 0:
476            return []
477
478        initialCount = self.count()
479        if initialCount == 0:
480            return []
481
482        rand = random.Random(seed)
483
484        if (not withReplacement) and num >= initialCount:
485            # shuffle current RDD and return
486            samples = self.collect()
487            rand.shuffle(samples)
488            return samples
489
490        maxSampleSize = sys.maxsize - int(numStDev * sqrt(sys.maxsize))
491        if num > maxSampleSize:
492            raise ValueError(
493                "Sample size cannot be greater than %d." % maxSampleSize)
494
495        fraction = RDD._computeFractionForSampleSize(
496            num, initialCount, withReplacement)
497        samples = self.sample(withReplacement, fraction, seed).collect()
498
499        # If the first sample didn't turn out large enough, keep trying to take samples;
500        # this shouldn't happen often because we use a big multiplier for their initial size.
501        # See: scala/spark/RDD.scala
502        while len(samples) < num:
503            # TODO: add log warning for when more than one iteration was run
504            seed = rand.randint(0, sys.maxsize)
505            samples = self.sample(withReplacement, fraction, seed).collect()
506
507        rand.shuffle(samples)
508
509        return samples[0:num]
510
511    @staticmethod
512    def _computeFractionForSampleSize(sampleSizeLowerBound, total, withReplacement):
513        """
514        Returns a sampling rate that guarantees a sample of
515        size >= sampleSizeLowerBound 99.99% of the time.
516
517        How the sampling rate is determined:
518        Let p = num / total, where num is the sample size and total is the
519        total number of data points in the RDD. We're trying to compute
520        q > p such that
521          - when sampling with replacement, we're drawing each data point
522            with prob_i ~ Pois(q), where we want to guarantee
523            Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to
524            total), i.e. the failure rate of not having a sufficiently large
525            sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient
526            to guarantee 0.9999 success rate for num > 12, but we need a
527            slightly larger q (9 empirically determined).
528          - when sampling without replacement, we're drawing each data point
529            with prob_i ~ Binomial(total, fraction) and our choice of q
530            guarantees 1-delta, or 0.9999 success rate, where success rate is
531            defined the same as in sampling with replacement.
532        """
533        fraction = float(sampleSizeLowerBound) / total
534        if withReplacement:
535            numStDev = 5
536            if (sampleSizeLowerBound < 12):
537                numStDev = 9
538            return fraction + numStDev * sqrt(fraction / total)
539        else:
540            delta = 0.00005
541            gamma = - log(delta) / total
542            return min(1, fraction + gamma + sqrt(gamma * gamma + 2 * gamma * fraction))
543
544    def union(self, other):
545        """
546        Return the union of this RDD and another one.
547
548        >>> rdd = sc.parallelize([1, 1, 2, 3])
549        >>> rdd.union(rdd).collect()
550        [1, 1, 2, 3, 1, 1, 2, 3]
551        """
552        if self._jrdd_deserializer == other._jrdd_deserializer:
553            rdd = RDD(self._jrdd.union(other._jrdd), self.ctx,
554                      self._jrdd_deserializer)
555        else:
556            # These RDDs contain data in different serialized formats, so we
557            # must normalize them to the default serializer.
558            self_copy = self._reserialize()
559            other_copy = other._reserialize()
560            rdd = RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
561                      self.ctx.serializer)
562        if (self.partitioner == other.partitioner and
563                self.getNumPartitions() == rdd.getNumPartitions()):
564            rdd.partitioner = self.partitioner
565        return rdd
566
567    def intersection(self, other):
568        """
569        Return the intersection of this RDD and another one. The output will
570        not contain any duplicate elements, even if the input RDDs did.
571
572        .. note:: This method performs a shuffle internally.
573
574        >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
575        >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
576        >>> rdd1.intersection(rdd2).collect()
577        [1, 2, 3]
578        """
579        return self.map(lambda v: (v, None)) \
580            .cogroup(other.map(lambda v: (v, None))) \
581            .filter(lambda k_vs: all(k_vs[1])) \
582            .keys()
583
584    def _reserialize(self, serializer=None):
585        serializer = serializer or self.ctx.serializer
586        if self._jrdd_deserializer != serializer:
587            self = self.map(lambda x: x, preservesPartitioning=True)
588            self._jrdd_deserializer = serializer
589        return self
590
591    def __add__(self, other):
592        """
593        Return the union of this RDD and another one.
594
595        >>> rdd = sc.parallelize([1, 1, 2, 3])
596        >>> (rdd + rdd).collect()
597        [1, 1, 2, 3, 1, 1, 2, 3]
598        """
599        if not isinstance(other, RDD):
600            raise TypeError
601        return self.union(other)
602
603    def repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=portable_hash,
604                                           ascending=True, keyfunc=lambda x: x):
605        """
606        Repartition the RDD according to the given partitioner and, within each resulting partition,
607        sort records by their keys.
608
609        >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
610        >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 2)
611        >>> rdd2.glom().collect()
612        [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]
613        """
614        if numPartitions is None:
615            numPartitions = self._defaultReducePartitions()
616
617        memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
618        serializer = self._jrdd_deserializer
619
620        def sortPartition(iterator):
621            sort = ExternalSorter(memory * 0.9, serializer).sorted
622            return iter(sort(iterator, key=lambda k_v: keyfunc(k_v[0]), reverse=(not ascending)))
623
624        return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True)
625
626    def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
627        """
628        Sorts this RDD, which is assumed to consist of (key, value) pairs.
629        # noqa
630
631        >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
632        >>> sc.parallelize(tmp).sortByKey().first()
633        ('1', 3)
634        >>> sc.parallelize(tmp).sortByKey(True, 1).collect()
635        [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
636        >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
637        [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
638        >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
639        >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
640        >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
641        [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
642        """
643        if numPartitions is None:
644            numPartitions = self._defaultReducePartitions()
645
646        memory = self._memory_limit()
647        serializer = self._jrdd_deserializer
648
649        def sortPartition(iterator):
650            sort = ExternalSorter(memory * 0.9, serializer).sorted
651            return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))
652
653        if numPartitions == 1:
654            if self.getNumPartitions() > 1:
655                self = self.coalesce(1)
656            return self.mapPartitions(sortPartition, True)
657
658        # first compute the boundary of each part via sampling: we want to partition
659        # the key-space into bins such that the bins have roughly the same
660        # number of (key, value) pairs falling into them
661        rddSize = self.count()
662        if not rddSize:
663            return self  # empty RDD
664        maxSampleSize = numPartitions * 20.0  # constant from Spark's RangePartitioner
665        fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
666        samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect()
667        samples = sorted(samples, key=keyfunc)
668
669        # we have numPartitions many parts but one of the them has
670        # an implicit boundary
671        bounds = [samples[int(len(samples) * (i + 1) / numPartitions)]
672                  for i in range(0, numPartitions - 1)]
673
674        def rangePartitioner(k):
675            p = bisect.bisect_left(bounds, keyfunc(k))
676            if ascending:
677                return p
678            else:
679                return numPartitions - 1 - p
680
681        return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)
682
683    def sortBy(self, keyfunc, ascending=True, numPartitions=None):
684        """
685        Sorts this RDD by the given keyfunc
686
687        >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
688        >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
689        [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
690        >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
691        [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
692        """
693        return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
694
695    def glom(self):
696        """
697        Return an RDD created by coalescing all elements within each partition
698        into a list.
699
700        >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
701        >>> sorted(rdd.glom().collect())
702        [[1, 2], [3, 4]]
703        """
704        def func(iterator):
705            yield list(iterator)
706        return self.mapPartitions(func)
707
708    def cartesian(self, other):
709        """
710        Return the Cartesian product of this RDD and another one, that is, the
711        RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
712        C{b} is in C{other}.
713
714        >>> rdd = sc.parallelize([1, 2])
715        >>> sorted(rdd.cartesian(rdd).collect())
716        [(1, 1), (1, 2), (2, 1), (2, 2)]
717        """
718        # Due to batching, we can't use the Java cartesian method.
719        deserializer = CartesianDeserializer(self._jrdd_deserializer,
720                                             other._jrdd_deserializer)
721        return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
722
723    def groupBy(self, f, numPartitions=None, partitionFunc=portable_hash):
724        """
725        Return an RDD of grouped items.
726
727        >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
728        >>> result = rdd.groupBy(lambda x: x % 2).collect()
729        >>> sorted([(x, sorted(y)) for (x, y) in result])
730        [(0, [2, 8]), (1, [1, 1, 3, 5])]
731        """
732        return self.map(lambda x: (f(x), x)).groupByKey(numPartitions, partitionFunc)
733
734    @ignore_unicode_prefix
735    def pipe(self, command, env=None, checkCode=False):
736        """
737        Return an RDD created by piping elements to a forked external process.
738
739        >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
740        [u'1', u'2', u'', u'3']
741
742        :param checkCode: whether or not to check the return value of the shell command.
743        """
744        if env is None:
745            env = dict()
746
747        def func(iterator):
748            pipe = Popen(
749                shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
750
751            def pipe_objs(out):
752                for obj in iterator:
753                    s = str(obj).rstrip('\n') + '\n'
754                    out.write(s.encode('utf-8'))
755                out.close()
756            Thread(target=pipe_objs, args=[pipe.stdin]).start()
757
758            def check_return_code():
759                pipe.wait()
760                if checkCode and pipe.returncode:
761                    raise Exception("Pipe function `%s' exited "
762                                    "with error code %d" % (command, pipe.returncode))
763                else:
764                    for i in range(0):
765                        yield i
766            return (x.rstrip(b'\n').decode('utf-8') for x in
767                    chain(iter(pipe.stdout.readline, b''), check_return_code()))
768        return self.mapPartitions(func)
769
770    def foreach(self, f):
771        """
772        Applies a function to all elements of this RDD.
773
774        >>> def f(x): print(x)
775        >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
776        """
777        def processPartition(iterator):
778            for x in iterator:
779                f(x)
780            return iter([])
781        self.mapPartitions(processPartition).count()  # Force evaluation
782
783    def foreachPartition(self, f):
784        """
785        Applies a function to each partition of this RDD.
786
787        >>> def f(iterator):
788        ...     for x in iterator:
789        ...          print(x)
790        >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
791        """
792        def func(it):
793            r = f(it)
794            try:
795                return iter(r)
796            except TypeError:
797                return iter([])
798        self.mapPartitions(func).count()  # Force evaluation
799
800    def collect(self):
801        """
802        Return a list that contains all of the elements in this RDD.
803
804        .. note:: This method should only be used if the resulting array is expected
805            to be small, as all the data is loaded into the driver's memory.
806        """
807        with SCCallSiteSync(self.context) as css:
808            port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
809        return list(_load_from_socket(port, self._jrdd_deserializer))
810
811    def reduce(self, f):
812        """
813        Reduces the elements of this RDD using the specified commutative and
814        associative binary operator. Currently reduces partitions locally.
815
816        >>> from operator import add
817        >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
818        15
819        >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
820        10
821        >>> sc.parallelize([]).reduce(add)
822        Traceback (most recent call last):
823            ...
824        ValueError: Can not reduce() empty RDD
825        """
826        def func(iterator):
827            iterator = iter(iterator)
828            try:
829                initial = next(iterator)
830            except StopIteration:
831                return
832            yield reduce(f, iterator, initial)
833
834        vals = self.mapPartitions(func).collect()
835        if vals:
836            return reduce(f, vals)
837        raise ValueError("Can not reduce() empty RDD")
838
839    def treeReduce(self, f, depth=2):
840        """
841        Reduces the elements of this RDD in a multi-level tree pattern.
842
843        :param depth: suggested depth of the tree (default: 2)
844
845        >>> add = lambda x, y: x + y
846        >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10)
847        >>> rdd.treeReduce(add)
848        -5
849        >>> rdd.treeReduce(add, 1)
850        -5
851        >>> rdd.treeReduce(add, 2)
852        -5
853        >>> rdd.treeReduce(add, 5)
854        -5
855        >>> rdd.treeReduce(add, 10)
856        -5
857        """
858        if depth < 1:
859            raise ValueError("Depth cannot be smaller than 1 but got %d." % depth)
860
861        zeroValue = None, True  # Use the second entry to indicate whether this is a dummy value.
862
863        def op(x, y):
864            if x[1]:
865                return y
866            elif y[1]:
867                return x
868            else:
869                return f(x[0], y[0]), False
870
871        reduced = self.map(lambda x: (x, False)).treeAggregate(zeroValue, op, op, depth)
872        if reduced[1]:
873            raise ValueError("Cannot reduce empty RDD.")
874        return reduced[0]
875
876    def fold(self, zeroValue, op):
877        """
878        Aggregate the elements of each partition, and then the results for all
879        the partitions, using a given associative function and a neutral "zero value."
880
881        The function C{op(t1, t2)} is allowed to modify C{t1} and return it
882        as its result value to avoid object allocation; however, it should not
883        modify C{t2}.
884
885        This behaves somewhat differently from fold operations implemented
886        for non-distributed collections in functional languages like Scala.
887        This fold operation may be applied to partitions individually, and then
888        fold those results into the final result, rather than apply the fold
889        to each element sequentially in some defined ordering. For functions
890        that are not commutative, the result may differ from that of a fold
891        applied to a non-distributed collection.
892
893        >>> from operator import add
894        >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
895        15
896        """
897        def func(iterator):
898            acc = zeroValue
899            for obj in iterator:
900                acc = op(acc, obj)
901            yield acc
902        # collecting result of mapPartitions here ensures that the copy of
903        # zeroValue provided to each partition is unique from the one provided
904        # to the final reduce call
905        vals = self.mapPartitions(func).collect()
906        return reduce(op, vals, zeroValue)
907
908    def aggregate(self, zeroValue, seqOp, combOp):
909        """
910        Aggregate the elements of each partition, and then the results for all
911        the partitions, using a given combine functions and a neutral "zero
912        value."
913
914        The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
915        as its result value to avoid object allocation; however, it should not
916        modify C{t2}.
917
918        The first function (seqOp) can return a different result type, U, than
919        the type of this RDD. Thus, we need one operation for merging a T into
920        an U and one operation for merging two U
921
922        >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
923        >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
924        >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
925        (10, 4)
926        >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
927        (0, 0)
928        """
929        def func(iterator):
930            acc = zeroValue
931            for obj in iterator:
932                acc = seqOp(acc, obj)
933            yield acc
934        # collecting result of mapPartitions here ensures that the copy of
935        # zeroValue provided to each partition is unique from the one provided
936        # to the final reduce call
937        vals = self.mapPartitions(func).collect()
938        return reduce(combOp, vals, zeroValue)
939
940    def treeAggregate(self, zeroValue, seqOp, combOp, depth=2):
941        """
942        Aggregates the elements of this RDD in a multi-level tree
943        pattern.
944
945        :param depth: suggested depth of the tree (default: 2)
946
947        >>> add = lambda x, y: x + y
948        >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10)
949        >>> rdd.treeAggregate(0, add, add)
950        -5
951        >>> rdd.treeAggregate(0, add, add, 1)
952        -5
953        >>> rdd.treeAggregate(0, add, add, 2)
954        -5
955        >>> rdd.treeAggregate(0, add, add, 5)
956        -5
957        >>> rdd.treeAggregate(0, add, add, 10)
958        -5
959        """
960        if depth < 1:
961            raise ValueError("Depth cannot be smaller than 1 but got %d." % depth)
962
963        if self.getNumPartitions() == 0:
964            return zeroValue
965
966        def aggregatePartition(iterator):
967            acc = zeroValue
968            for obj in iterator:
969                acc = seqOp(acc, obj)
970            yield acc
971
972        partiallyAggregated = self.mapPartitions(aggregatePartition)
973        numPartitions = partiallyAggregated.getNumPartitions()
974        scale = max(int(ceil(pow(numPartitions, 1.0 / depth))), 2)
975        # If creating an extra level doesn't help reduce the wall-clock time, we stop the tree
976        # aggregation.
977        while numPartitions > scale + numPartitions / scale:
978            numPartitions /= scale
979            curNumPartitions = int(numPartitions)
980
981            def mapPartition(i, iterator):
982                for obj in iterator:
983                    yield (i % curNumPartitions, obj)
984
985            partiallyAggregated = partiallyAggregated \
986                .mapPartitionsWithIndex(mapPartition) \
987                .reduceByKey(combOp, curNumPartitions) \
988                .values()
989
990        return partiallyAggregated.reduce(combOp)
991
992    def max(self, key=None):
993        """
994        Find the maximum item in this RDD.
995
996        :param key: A function used to generate key for comparing
997
998        >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])
999        >>> rdd.max()
1000        43.0
1001        >>> rdd.max(key=str)
1002        5.0
1003        """
1004        if key is None:
1005            return self.reduce(max)
1006        return self.reduce(lambda a, b: max(a, b, key=key))
1007
1008    def min(self, key=None):
1009        """
1010        Find the minimum item in this RDD.
1011
1012        :param key: A function used to generate key for comparing
1013
1014        >>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0])
1015        >>> rdd.min()
1016        2.0
1017        >>> rdd.min(key=str)
1018        10.0
1019        """
1020        if key is None:
1021            return self.reduce(min)
1022        return self.reduce(lambda a, b: min(a, b, key=key))
1023
1024    def sum(self):
1025        """
1026        Add up the elements in this RDD.
1027
1028        >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
1029        6.0
1030        """
1031        return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
1032
1033    def count(self):
1034        """
1035        Return the number of elements in this RDD.
1036
1037        >>> sc.parallelize([2, 3, 4]).count()
1038        3
1039        """
1040        return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1041
1042    def stats(self):
1043        """
1044        Return a L{StatCounter} object that captures the mean, variance
1045        and count of the RDD's elements in one operation.
1046        """
1047        def redFunc(left_counter, right_counter):
1048            return left_counter.mergeStats(right_counter)
1049
1050        return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
1051
1052    def histogram(self, buckets):
1053        """
1054        Compute a histogram using the provided buckets. The buckets
1055        are all open to the right except for the last which is closed.
1056        e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
1057        which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
1058        and 50 we would have a histogram of 1,0,1.
1059
1060        If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
1061        this can be switched from an O(log n) inseration to O(1) per
1062        element (where n is the number of buckets).
1063
1064        Buckets must be sorted, not contain any duplicates, and have
1065        at least two elements.
1066
1067        If `buckets` is a number, it will generate buckets which are
1068        evenly spaced between the minimum and maximum of the RDD. For
1069        example, if the min value is 0 and the max is 100, given `buckets`
1070        as 2, the resulting buckets will be [0,50) [50,100]. `buckets` must
1071        be at least 1. An exception is raised if the RDD contains infinity.
1072        If the elements in the RDD do not vary (max == min), a single bucket
1073        will be used.
1074
1075        The return value is a tuple of buckets and histogram.
1076
1077        >>> rdd = sc.parallelize(range(51))
1078        >>> rdd.histogram(2)
1079        ([0, 25, 50], [25, 26])
1080        >>> rdd.histogram([0, 5, 25, 50])
1081        ([0, 5, 25, 50], [5, 20, 26])
1082        >>> rdd.histogram([0, 15, 30, 45, 60])  # evenly spaced buckets
1083        ([0, 15, 30, 45, 60], [15, 15, 15, 6])
1084        >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
1085        >>> rdd.histogram(("a", "b", "c"))
1086        (('a', 'b', 'c'), [2, 2])
1087        """
1088
1089        if isinstance(buckets, int):
1090            if buckets < 1:
1091                raise ValueError("number of buckets must be >= 1")
1092
1093            # filter out non-comparable elements
1094            def comparable(x):
1095                if x is None:
1096                    return False
1097                if type(x) is float and isnan(x):
1098                    return False
1099                return True
1100
1101            filtered = self.filter(comparable)
1102
1103            # faster than stats()
1104            def minmax(a, b):
1105                return min(a[0], b[0]), max(a[1], b[1])
1106            try:
1107                minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax)
1108            except TypeError as e:
1109                if " empty " in str(e):
1110                    raise ValueError("can not generate buckets from empty RDD")
1111                raise
1112
1113            if minv == maxv or buckets == 1:
1114                return [minv, maxv], [filtered.count()]
1115
1116            try:
1117                inc = (maxv - minv) / buckets
1118            except TypeError:
1119                raise TypeError("Can not generate buckets with non-number in RDD")
1120
1121            if isinf(inc):
1122                raise ValueError("Can not generate buckets with infinite value")
1123
1124            # keep them as integer if possible
1125            inc = int(inc)
1126            if inc * buckets != maxv - minv:
1127                inc = (maxv - minv) * 1.0 / buckets
1128
1129            buckets = [i * inc + minv for i in range(buckets)]
1130            buckets.append(maxv)  # fix accumulated error
1131            even = True
1132
1133        elif isinstance(buckets, (list, tuple)):
1134            if len(buckets) < 2:
1135                raise ValueError("buckets should have more than one value")
1136
1137            if any(i is None or isinstance(i, float) and isnan(i) for i in buckets):
1138                raise ValueError("can not have None or NaN in buckets")
1139
1140            if sorted(buckets) != list(buckets):
1141                raise ValueError("buckets should be sorted")
1142
1143            if len(set(buckets)) != len(buckets):
1144                raise ValueError("buckets should not contain duplicated values")
1145
1146            minv = buckets[0]
1147            maxv = buckets[-1]
1148            even = False
1149            inc = None
1150            try:
1151                steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)]
1152            except TypeError:
1153                pass  # objects in buckets do not support '-'
1154            else:
1155                if max(steps) - min(steps) < 1e-10:  # handle precision errors
1156                    even = True
1157                    inc = (maxv - minv) / (len(buckets) - 1)
1158
1159        else:
1160            raise TypeError("buckets should be a list or tuple or number(int or long)")
1161
1162        def histogram(iterator):
1163            counters = [0] * len(buckets)
1164            for i in iterator:
1165                if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv:
1166                    continue
1167                t = (int((i - minv) / inc) if even
1168                     else bisect.bisect_right(buckets, i) - 1)
1169                counters[t] += 1
1170            # add last two together
1171            last = counters.pop()
1172            counters[-1] += last
1173            return [counters]
1174
1175        def mergeCounters(a, b):
1176            return [i + j for i, j in zip(a, b)]
1177
1178        return buckets, self.mapPartitions(histogram).reduce(mergeCounters)
1179
1180    def mean(self):
1181        """
1182        Compute the mean of this RDD's elements.
1183
1184        >>> sc.parallelize([1, 2, 3]).mean()
1185        2.0
1186        """
1187        return self.stats().mean()
1188
1189    def variance(self):
1190        """
1191        Compute the variance of this RDD's elements.
1192
1193        >>> sc.parallelize([1, 2, 3]).variance()
1194        0.666...
1195        """
1196        return self.stats().variance()
1197
1198    def stdev(self):
1199        """
1200        Compute the standard deviation of this RDD's elements.
1201
1202        >>> sc.parallelize([1, 2, 3]).stdev()
1203        0.816...
1204        """
1205        return self.stats().stdev()
1206
1207    def sampleStdev(self):
1208        """
1209        Compute the sample standard deviation of this RDD's elements (which
1210        corrects for bias in estimating the standard deviation by dividing by
1211        N-1 instead of N).
1212
1213        >>> sc.parallelize([1, 2, 3]).sampleStdev()
1214        1.0
1215        """
1216        return self.stats().sampleStdev()
1217
1218    def sampleVariance(self):
1219        """
1220        Compute the sample variance of this RDD's elements (which corrects
1221        for bias in estimating the variance by dividing by N-1 instead of N).
1222
1223        >>> sc.parallelize([1, 2, 3]).sampleVariance()
1224        1.0
1225        """
1226        return self.stats().sampleVariance()
1227
1228    def countByValue(self):
1229        """
1230        Return the count of each unique value in this RDD as a dictionary of
1231        (value, count) pairs.
1232
1233        >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
1234        [(1, 2), (2, 3)]
1235        """
1236        def countPartition(iterator):
1237            counts = defaultdict(int)
1238            for obj in iterator:
1239                counts[obj] += 1
1240            yield counts
1241
1242        def mergeMaps(m1, m2):
1243            for k, v in m2.items():
1244                m1[k] += v
1245            return m1
1246        return self.mapPartitions(countPartition).reduce(mergeMaps)
1247
1248    def top(self, num, key=None):
1249        """
1250        Get the top N elements from an RDD.
1251
1252        .. note:: This method should only be used if the resulting array is expected
1253            to be small, as all the data is loaded into the driver's memory.
1254
1255        .. note:: It returns the list sorted in descending order.
1256
1257        >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
1258        [12]
1259        >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
1260        [6, 5]
1261        >>> sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str)
1262        [4, 3, 2]
1263        """
1264        def topIterator(iterator):
1265            yield heapq.nlargest(num, iterator, key=key)
1266
1267        def merge(a, b):
1268            return heapq.nlargest(num, a + b, key=key)
1269
1270        return self.mapPartitions(topIterator).reduce(merge)
1271
1272    def takeOrdered(self, num, key=None):
1273        """
1274        Get the N elements from an RDD ordered in ascending order or as
1275        specified by the optional key function.
1276
1277        .. note:: this method should only be used if the resulting array is expected
1278            to be small, as all the data is loaded into the driver's memory.
1279
1280        >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
1281        [1, 2, 3, 4, 5, 6]
1282        >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
1283        [10, 9, 7, 6, 5, 4]
1284        """
1285
1286        def merge(a, b):
1287            return heapq.nsmallest(num, a + b, key)
1288
1289        return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge)
1290
1291    def take(self, num):
1292        """
1293        Take the first num elements of the RDD.
1294
1295        It works by first scanning one partition, and use the results from
1296        that partition to estimate the number of additional partitions needed
1297        to satisfy the limit.
1298
1299        Translated from the Scala implementation in RDD#take().
1300
1301        .. note:: this method should only be used if the resulting array is expected
1302            to be small, as all the data is loaded into the driver's memory.
1303
1304        >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
1305        [2, 3]
1306        >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
1307        [2, 3, 4, 5, 6]
1308        >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
1309        [91, 92, 93]
1310        """
1311        items = []
1312        totalParts = self.getNumPartitions()
1313        partsScanned = 0
1314
1315        while len(items) < num and partsScanned < totalParts:
1316            # The number of partitions to try in this iteration.
1317            # It is ok for this number to be greater than totalParts because
1318            # we actually cap it at totalParts in runJob.
1319            numPartsToTry = 1
1320            if partsScanned > 0:
1321                # If we didn't find any rows after the previous iteration,
1322                # quadruple and retry.  Otherwise, interpolate the number of
1323                # partitions we need to try, but overestimate it by 50%.
1324                # We also cap the estimation in the end.
1325                if len(items) == 0:
1326                    numPartsToTry = partsScanned * 4
1327                else:
1328                    # the first paramter of max is >=1 whenever partsScanned >= 2
1329                    numPartsToTry = int(1.5 * num * partsScanned / len(items)) - partsScanned
1330                    numPartsToTry = min(max(numPartsToTry, 1), partsScanned * 4)
1331
1332            left = num - len(items)
1333
1334            def takeUpToNumLeft(iterator):
1335                iterator = iter(iterator)
1336                taken = 0
1337                while taken < left:
1338                    yield next(iterator)
1339                    taken += 1
1340
1341            p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
1342            res = self.context.runJob(self, takeUpToNumLeft, p)
1343
1344            items += res
1345            partsScanned += numPartsToTry
1346
1347        return items[:num]
1348
1349    def first(self):
1350        """
1351        Return the first element in this RDD.
1352
1353        >>> sc.parallelize([2, 3, 4]).first()
1354        2
1355        >>> sc.parallelize([]).first()
1356        Traceback (most recent call last):
1357            ...
1358        ValueError: RDD is empty
1359        """
1360        rs = self.take(1)
1361        if rs:
1362            return rs[0]
1363        raise ValueError("RDD is empty")
1364
1365    def isEmpty(self):
1366        """
1367        Returns true if and only if the RDD contains no elements at all.
1368
1369        .. note:: an RDD may be empty even when it has at least 1 partition.
1370
1371        >>> sc.parallelize([]).isEmpty()
1372        True
1373        >>> sc.parallelize([1]).isEmpty()
1374        False
1375        """
1376        return self.getNumPartitions() == 0 or len(self.take(1)) == 0
1377
1378    def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
1379        """
1380        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1381        system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are
1382        converted for output using either user specified converters or, by default,
1383        L{org.apache.spark.api.python.JavaToWritableConverter}.
1384
1385        :param conf: Hadoop job configuration, passed in as a dict
1386        :param keyConverter: (None by default)
1387        :param valueConverter: (None by default)
1388        """
1389        jconf = self.ctx._dictToJavaMap(conf)
1390        pickledRDD = self._pickled()
1391        self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf,
1392                                                    keyConverter, valueConverter, True)
1393
1394    def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
1395                               keyConverter=None, valueConverter=None, conf=None):
1396        """
1397        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1398        system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types
1399        will be inferred if not specified. Keys and values are converted for output using either
1400        user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
1401        C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
1402        of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
1403
1404        :param path: path to Hadoop file
1405        :param outputFormatClass: fully qualified classname of Hadoop OutputFormat
1406               (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
1407        :param keyClass: fully qualified classname of key Writable class
1408               (e.g. "org.apache.hadoop.io.IntWritable", None by default)
1409        :param valueClass: fully qualified classname of value Writable class
1410               (e.g. "org.apache.hadoop.io.Text", None by default)
1411        :param keyConverter: (None by default)
1412        :param valueConverter: (None by default)
1413        :param conf: Hadoop job configuration, passed in as a dict (None by default)
1414        """
1415        jconf = self.ctx._dictToJavaMap(conf)
1416        pickledRDD = self._pickled()
1417        self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, True, path,
1418                                                       outputFormatClass,
1419                                                       keyClass, valueClass,
1420                                                       keyConverter, valueConverter, jconf)
1421
1422    def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
1423        """
1424        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1425        system, using the old Hadoop OutputFormat API (mapred package). Keys/values are
1426        converted for output using either user specified converters or, by default,
1427        L{org.apache.spark.api.python.JavaToWritableConverter}.
1428
1429        :param conf: Hadoop job configuration, passed in as a dict
1430        :param keyConverter: (None by default)
1431        :param valueConverter: (None by default)
1432        """
1433        jconf = self.ctx._dictToJavaMap(conf)
1434        pickledRDD = self._pickled()
1435        self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf,
1436                                                    keyConverter, valueConverter, False)
1437
1438    def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
1439                         keyConverter=None, valueConverter=None, conf=None,
1440                         compressionCodecClass=None):
1441        """
1442        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1443        system, using the old Hadoop OutputFormat API (mapred package). Key and value types
1444        will be inferred if not specified. Keys and values are converted for output using either
1445        user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
1446        C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
1447        of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
1448
1449        :param path: path to Hadoop file
1450        :param outputFormatClass: fully qualified classname of Hadoop OutputFormat
1451               (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat")
1452        :param keyClass: fully qualified classname of key Writable class
1453               (e.g. "org.apache.hadoop.io.IntWritable", None by default)
1454        :param valueClass: fully qualified classname of value Writable class
1455               (e.g. "org.apache.hadoop.io.Text", None by default)
1456        :param keyConverter: (None by default)
1457        :param valueConverter: (None by default)
1458        :param conf: (None by default)
1459        :param compressionCodecClass: (None by default)
1460        """
1461        jconf = self.ctx._dictToJavaMap(conf)
1462        pickledRDD = self._pickled()
1463        self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, True, path,
1464                                                 outputFormatClass,
1465                                                 keyClass, valueClass,
1466                                                 keyConverter, valueConverter,
1467                                                 jconf, compressionCodecClass)
1468
1469    def saveAsSequenceFile(self, path, compressionCodecClass=None):
1470        """
1471        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1472        system, using the L{org.apache.hadoop.io.Writable} types that we convert from the
1473        RDD's key and value types. The mechanism is as follows:
1474
1475            1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
1476            2. Keys and values of this Java RDD are converted to Writables and written out.
1477
1478        :param path: path to sequence file
1479        :param compressionCodecClass: (None by default)
1480        """
1481        pickledRDD = self._pickled()
1482        self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, True,
1483                                                   path, compressionCodecClass)
1484
1485    def saveAsPickleFile(self, path, batchSize=10):
1486        """
1487        Save this RDD as a SequenceFile of serialized objects. The serializer
1488        used is L{pyspark.serializers.PickleSerializer}, default batch size
1489        is 10.
1490
1491        >>> tmpFile = NamedTemporaryFile(delete=True)
1492        >>> tmpFile.close()
1493        >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)
1494        >>> sorted(sc.pickleFile(tmpFile.name, 5).map(str).collect())
1495        ['1', '2', 'rdd', 'spark']
1496        """
1497        if batchSize == 0:
1498            ser = AutoBatchedSerializer(PickleSerializer())
1499        else:
1500            ser = BatchedSerializer(PickleSerializer(), batchSize)
1501        self._reserialize(ser)._jrdd.saveAsObjectFile(path)
1502
1503    @ignore_unicode_prefix
1504    def saveAsTextFile(self, path, compressionCodecClass=None):
1505        """
1506        Save this RDD as a text file, using string representations of elements.
1507
1508        @param path: path to text file
1509        @param compressionCodecClass: (None by default) string i.e.
1510            "org.apache.hadoop.io.compress.GzipCodec"
1511
1512        >>> tempFile = NamedTemporaryFile(delete=True)
1513        >>> tempFile.close()
1514        >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
1515        >>> from fileinput import input
1516        >>> from glob import glob
1517        >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
1518        '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
1519
1520        Empty lines are tolerated when saving to text files.
1521
1522        >>> tempFile2 = NamedTemporaryFile(delete=True)
1523        >>> tempFile2.close()
1524        >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
1525        >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
1526        '\\n\\n\\nbar\\nfoo\\n'
1527
1528        Using compressionCodecClass
1529
1530        >>> tempFile3 = NamedTemporaryFile(delete=True)
1531        >>> tempFile3.close()
1532        >>> codec = "org.apache.hadoop.io.compress.GzipCodec"
1533        >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec)
1534        >>> from fileinput import input, hook_compressed
1535        >>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))
1536        >>> b''.join(result).decode('utf-8')
1537        u'bar\\nfoo\\n'
1538        """
1539        def func(split, iterator):
1540            for x in iterator:
1541                if not isinstance(x, (unicode, bytes)):
1542                    x = unicode(x)
1543                if isinstance(x, unicode):
1544                    x = x.encode("utf-8")
1545                yield x
1546        keyed = self.mapPartitionsWithIndex(func)
1547        keyed._bypass_serializer = True
1548        if compressionCodecClass:
1549            compressionCodec = self.ctx._jvm.java.lang.Class.forName(compressionCodecClass)
1550            keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec)
1551        else:
1552            keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
1553
1554    # Pair functions
1555
1556    def collectAsMap(self):
1557        """
1558        Return the key-value pairs in this RDD to the master as a dictionary.
1559
1560        .. note:: this method should only be used if the resulting data is expected
1561            to be small, as all the data is loaded into the driver's memory.
1562
1563        >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
1564        >>> m[1]
1565        2
1566        >>> m[3]
1567        4
1568        """
1569        return dict(self.collect())
1570
1571    def keys(self):
1572        """
1573        Return an RDD with the keys of each tuple.
1574
1575        >>> m = sc.parallelize([(1, 2), (3, 4)]).keys()
1576        >>> m.collect()
1577        [1, 3]
1578        """
1579        return self.map(lambda x: x[0])
1580
1581    def values(self):
1582        """
1583        Return an RDD with the values of each tuple.
1584
1585        >>> m = sc.parallelize([(1, 2), (3, 4)]).values()
1586        >>> m.collect()
1587        [2, 4]
1588        """
1589        return self.map(lambda x: x[1])
1590
1591    def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
1592        """
1593        Merge the values for each key using an associative and commutative reduce function.
1594
1595        This will also perform the merging locally on each mapper before
1596        sending results to a reducer, similarly to a "combiner" in MapReduce.
1597
1598        Output will be partitioned with C{numPartitions} partitions, or
1599        the default parallelism level if C{numPartitions} is not specified.
1600        Default partitioner is hash-partition.
1601
1602        >>> from operator import add
1603        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1604        >>> sorted(rdd.reduceByKey(add).collect())
1605        [('a', 2), ('b', 1)]
1606        """
1607        return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
1608
1609    def reduceByKeyLocally(self, func):
1610        """
1611        Merge the values for each key using an associative and commutative reduce function, but
1612        return the results immediately to the master as a dictionary.
1613
1614        This will also perform the merging locally on each mapper before
1615        sending results to a reducer, similarly to a "combiner" in MapReduce.
1616
1617        >>> from operator import add
1618        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1619        >>> sorted(rdd.reduceByKeyLocally(add).items())
1620        [('a', 2), ('b', 1)]
1621        """
1622        def reducePartition(iterator):
1623            m = {}
1624            for k, v in iterator:
1625                m[k] = func(m[k], v) if k in m else v
1626            yield m
1627
1628        def mergeMaps(m1, m2):
1629            for k, v in m2.items():
1630                m1[k] = func(m1[k], v) if k in m1 else v
1631            return m1
1632        return self.mapPartitions(reducePartition).reduce(mergeMaps)
1633
1634    def countByKey(self):
1635        """
1636        Count the number of elements for each key, and return the result to the
1637        master as a dictionary.
1638
1639        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1640        >>> sorted(rdd.countByKey().items())
1641        [('a', 2), ('b', 1)]
1642        """
1643        return self.map(lambda x: x[0]).countByValue()
1644
1645    def join(self, other, numPartitions=None):
1646        """
1647        Return an RDD containing all pairs of elements with matching keys in
1648        C{self} and C{other}.
1649
1650        Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
1651        (k, v1) is in C{self} and (k, v2) is in C{other}.
1652
1653        Performs a hash join across the cluster.
1654
1655        >>> x = sc.parallelize([("a", 1), ("b", 4)])
1656        >>> y = sc.parallelize([("a", 2), ("a", 3)])
1657        >>> sorted(x.join(y).collect())
1658        [('a', (1, 2)), ('a', (1, 3))]
1659        """
1660        return python_join(self, other, numPartitions)
1661
1662    def leftOuterJoin(self, other, numPartitions=None):
1663        """
1664        Perform a left outer join of C{self} and C{other}.
1665
1666        For each element (k, v) in C{self}, the resulting RDD will either
1667        contain all pairs (k, (v, w)) for w in C{other}, or the pair
1668        (k, (v, None)) if no elements in C{other} have key k.
1669
1670        Hash-partitions the resulting RDD into the given number of partitions.
1671
1672        >>> x = sc.parallelize([("a", 1), ("b", 4)])
1673        >>> y = sc.parallelize([("a", 2)])
1674        >>> sorted(x.leftOuterJoin(y).collect())
1675        [('a', (1, 2)), ('b', (4, None))]
1676        """
1677        return python_left_outer_join(self, other, numPartitions)
1678
1679    def rightOuterJoin(self, other, numPartitions=None):
1680        """
1681        Perform a right outer join of C{self} and C{other}.
1682
1683        For each element (k, w) in C{other}, the resulting RDD will either
1684        contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
1685        if no elements in C{self} have key k.
1686
1687        Hash-partitions the resulting RDD into the given number of partitions.
1688
1689        >>> x = sc.parallelize([("a", 1), ("b", 4)])
1690        >>> y = sc.parallelize([("a", 2)])
1691        >>> sorted(y.rightOuterJoin(x).collect())
1692        [('a', (2, 1)), ('b', (None, 4))]
1693        """
1694        return python_right_outer_join(self, other, numPartitions)
1695
1696    def fullOuterJoin(self, other, numPartitions=None):
1697        """
1698        Perform a right outer join of C{self} and C{other}.
1699
1700        For each element (k, v) in C{self}, the resulting RDD will either
1701        contain all pairs (k, (v, w)) for w in C{other}, or the pair
1702        (k, (v, None)) if no elements in C{other} have key k.
1703
1704        Similarly, for each element (k, w) in C{other}, the resulting RDD will
1705        either contain all pairs (k, (v, w)) for v in C{self}, or the pair
1706        (k, (None, w)) if no elements in C{self} have key k.
1707
1708        Hash-partitions the resulting RDD into the given number of partitions.
1709
1710        >>> x = sc.parallelize([("a", 1), ("b", 4)])
1711        >>> y = sc.parallelize([("a", 2), ("c", 8)])
1712        >>> sorted(x.fullOuterJoin(y).collect())
1713        [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
1714        """
1715        return python_full_outer_join(self, other, numPartitions)
1716
1717    # TODO: add option to control map-side combining
1718    # portable_hash is used as default, because builtin hash of None is different
1719    # cross machines.
1720    def partitionBy(self, numPartitions, partitionFunc=portable_hash):
1721        """
1722        Return a copy of the RDD partitioned using the specified partitioner.
1723
1724        >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
1725        >>> sets = pairs.partitionBy(2).glom().collect()
1726        >>> len(set(sets[0]).intersection(set(sets[1])))
1727        0
1728        """
1729        if numPartitions is None:
1730            numPartitions = self._defaultReducePartitions()
1731        partitioner = Partitioner(numPartitions, partitionFunc)
1732        if self.partitioner == partitioner:
1733            return self
1734
1735        # Transferring O(n) objects to Java is too expensive.
1736        # Instead, we'll form the hash buckets in Python,
1737        # transferring O(numPartitions) objects to Java.
1738        # Each object is a (splitNumber, [objects]) pair.
1739        # In order to avoid too huge objects, the objects are
1740        # grouped into chunks.
1741        outputSerializer = self.ctx._unbatched_serializer
1742
1743        limit = (_parse_memory(self.ctx._conf.get(
1744            "spark.python.worker.memory", "512m")) / 2)
1745
1746        def add_shuffle_key(split, iterator):
1747
1748            buckets = defaultdict(list)
1749            c, batch = 0, min(10 * numPartitions, 1000)
1750
1751            for k, v in iterator:
1752                buckets[partitionFunc(k) % numPartitions].append((k, v))
1753                c += 1
1754
1755                # check used memory and avg size of chunk of objects
1756                if (c % 1000 == 0 and get_used_memory() > limit
1757                        or c > batch):
1758                    n, size = len(buckets), 0
1759                    for split in list(buckets.keys()):
1760                        yield pack_long(split)
1761                        d = outputSerializer.dumps(buckets[split])
1762                        del buckets[split]
1763                        yield d
1764                        size += len(d)
1765
1766                    avg = int(size / n) >> 20
1767                    # let 1M < avg < 10M
1768                    if avg < 1:
1769                        batch *= 1.5
1770                    elif avg > 10:
1771                        batch = max(int(batch / 1.5), 1)
1772                    c = 0
1773
1774            for split, items in buckets.items():
1775                yield pack_long(split)
1776                yield outputSerializer.dumps(items)
1777
1778        keyed = self.mapPartitionsWithIndex(add_shuffle_key, preservesPartitioning=True)
1779        keyed._bypass_serializer = True
1780        with SCCallSiteSync(self.context) as css:
1781            pairRDD = self.ctx._jvm.PairwiseRDD(
1782                keyed._jrdd.rdd()).asJavaPairRDD()
1783            jpartitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
1784                                                           id(partitionFunc))
1785        jrdd = self.ctx._jvm.PythonRDD.valueOfPair(pairRDD.partitionBy(jpartitioner))
1786        rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
1787        rdd.partitioner = partitioner
1788        return rdd
1789
1790    # TODO: add control over map-side aggregation
1791    def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
1792                     numPartitions=None, partitionFunc=portable_hash):
1793        """
1794        Generic function to combine the elements for each key using a custom
1795        set of aggregation functions.
1796
1797        Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
1798        type" C.
1799
1800        Users provide three functions:
1801
1802            - C{createCombiner}, which turns a V into a C (e.g., creates
1803              a one-element list)
1804            - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
1805              a list)
1806            - C{mergeCombiners}, to combine two C's into a single one.
1807
1808        In addition, users can control the partitioning of the output RDD.
1809
1810        .. note:: V and C can be different -- for example, one might group an RDD of type
1811            (Int, Int) into an RDD of type (Int, List[Int]).
1812
1813        >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1814        >>> def add(a, b): return a + str(b)
1815        >>> sorted(x.combineByKey(str, add, add).collect())
1816        [('a', '11'), ('b', '1')]
1817        """
1818        if numPartitions is None:
1819            numPartitions = self._defaultReducePartitions()
1820
1821        serializer = self.ctx.serializer
1822        memory = self._memory_limit()
1823        agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
1824
1825        def combineLocally(iterator):
1826            merger = ExternalMerger(agg, memory * 0.9, serializer)
1827            merger.mergeValues(iterator)
1828            return merger.items()
1829
1830        locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True)
1831        shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)
1832
1833        def _mergeCombiners(iterator):
1834            merger = ExternalMerger(agg, memory, serializer)
1835            merger.mergeCombiners(iterator)
1836            return merger.items()
1837
1838        return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)
1839
1840    def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None,
1841                       partitionFunc=portable_hash):
1842        """
1843        Aggregate the values of each key, using given combine functions and a neutral
1844        "zero value". This function can return a different result type, U, than the type
1845        of the values in this RDD, V. Thus, we need one operation for merging a V into
1846        a U and one operation for merging two U's, The former operation is used for merging
1847        values within a partition, and the latter is used for merging values between
1848        partitions. To avoid memory allocation, both of these functions are
1849        allowed to modify and return their first argument instead of creating a new U.
1850        """
1851        def createZero():
1852            return copy.deepcopy(zeroValue)
1853
1854        return self.combineByKey(
1855            lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions, partitionFunc)
1856
1857    def foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=portable_hash):
1858        """
1859        Merge the values for each key using an associative function "func"
1860        and a neutral "zeroValue" which may be added to the result an
1861        arbitrary number of times, and must not change the result
1862        (e.g., 0 for addition, or 1 for multiplication.).
1863
1864        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1865        >>> from operator import add
1866        >>> sorted(rdd.foldByKey(0, add).collect())
1867        [('a', 2), ('b', 1)]
1868        """
1869        def createZero():
1870            return copy.deepcopy(zeroValue)
1871
1872        return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions,
1873                                 partitionFunc)
1874
1875    def _memory_limit(self):
1876        return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
1877
1878    # TODO: support variant with custom partitioner
1879    def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):
1880        """
1881        Group the values for each key in the RDD into a single sequence.
1882        Hash-partitions the resulting RDD with numPartitions partitions.
1883
1884        .. note:: If you are grouping in order to perform an aggregation (such as a
1885            sum or average) over each key, using reduceByKey or aggregateByKey will
1886            provide much better performance.
1887
1888        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1889        >>> sorted(rdd.groupByKey().mapValues(len).collect())
1890        [('a', 2), ('b', 1)]
1891        >>> sorted(rdd.groupByKey().mapValues(list).collect())
1892        [('a', [1, 1]), ('b', [1])]
1893        """
1894        def createCombiner(x):
1895            return [x]
1896
1897        def mergeValue(xs, x):
1898            xs.append(x)
1899            return xs
1900
1901        def mergeCombiners(a, b):
1902            a.extend(b)
1903            return a
1904
1905        memory = self._memory_limit()
1906        serializer = self._jrdd_deserializer
1907        agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
1908
1909        def combine(iterator):
1910            merger = ExternalMerger(agg, memory * 0.9, serializer)
1911            merger.mergeValues(iterator)
1912            return merger.items()
1913
1914        locally_combined = self.mapPartitions(combine, preservesPartitioning=True)
1915        shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)
1916
1917        def groupByKey(it):
1918            merger = ExternalGroupBy(agg, memory, serializer)
1919            merger.mergeCombiners(it)
1920            return merger.items()
1921
1922        return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)
1923
1924    def flatMapValues(self, f):
1925        """
1926        Pass each value in the key-value pair RDD through a flatMap function
1927        without changing the keys; this also retains the original RDD's
1928        partitioning.
1929
1930        >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
1931        >>> def f(x): return x
1932        >>> x.flatMapValues(f).collect()
1933        [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
1934        """
1935        flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
1936        return self.flatMap(flat_map_fn, preservesPartitioning=True)
1937
1938    def mapValues(self, f):
1939        """
1940        Pass each value in the key-value pair RDD through a map function
1941        without changing the keys; this also retains the original RDD's
1942        partitioning.
1943
1944        >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
1945        >>> def f(x): return len(x)
1946        >>> x.mapValues(f).collect()
1947        [('a', 3), ('b', 1)]
1948        """
1949        map_values_fn = lambda kv: (kv[0], f(kv[1]))
1950        return self.map(map_values_fn, preservesPartitioning=True)
1951
1952    def groupWith(self, other, *others):
1953        """
1954        Alias for cogroup but with support for multiple RDDs.
1955
1956        >>> w = sc.parallelize([("a", 5), ("b", 6)])
1957        >>> x = sc.parallelize([("a", 1), ("b", 4)])
1958        >>> y = sc.parallelize([("a", 2)])
1959        >>> z = sc.parallelize([("b", 42)])
1960        >>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))]
1961        [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
1962
1963        """
1964        return python_cogroup((self, other) + others, numPartitions=None)
1965
1966    # TODO: add variant with custom parittioner
1967    def cogroup(self, other, numPartitions=None):
1968        """
1969        For each key k in C{self} or C{other}, return a resulting RDD that
1970        contains a tuple with the list of values for that key in C{self} as
1971        well as C{other}.
1972
1973        >>> x = sc.parallelize([("a", 1), ("b", 4)])
1974        >>> y = sc.parallelize([("a", 2)])
1975        >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
1976        [('a', ([1], [2])), ('b', ([4], []))]
1977        """
1978        return python_cogroup((self, other), numPartitions)
1979
1980    def sampleByKey(self, withReplacement, fractions, seed=None):
1981        """
1982        Return a subset of this RDD sampled by key (via stratified sampling).
1983        Create a sample of this RDD using variable sampling rates for
1984        different keys as specified by fractions, a key to sampling rate map.
1985
1986        >>> fractions = {"a": 0.2, "b": 0.1}
1987        >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000)))
1988        >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect())
1989        >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150
1990        True
1991        >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0
1992        True
1993        >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0
1994        True
1995        """
1996        for fraction in fractions.values():
1997            assert fraction >= 0.0, "Negative fraction value: %s" % fraction
1998        return self.mapPartitionsWithIndex(
1999            RDDStratifiedSampler(withReplacement, fractions, seed).func, True)
2000
2001    def subtractByKey(self, other, numPartitions=None):
2002        """
2003        Return each (key, value) pair in C{self} that has no pair with matching
2004        key in C{other}.
2005
2006        >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
2007        >>> y = sc.parallelize([("a", 3), ("c", None)])
2008        >>> sorted(x.subtractByKey(y).collect())
2009        [('b', 4), ('b', 5)]
2010        """
2011        def filter_func(pair):
2012            key, (val1, val2) = pair
2013            return val1 and not val2
2014        return self.cogroup(other, numPartitions).filter(filter_func).flatMapValues(lambda x: x[0])
2015
2016    def subtract(self, other, numPartitions=None):
2017        """
2018        Return each value in C{self} that is not contained in C{other}.
2019
2020        >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
2021        >>> y = sc.parallelize([("a", 3), ("c", None)])
2022        >>> sorted(x.subtract(y).collect())
2023        [('a', 1), ('b', 4), ('b', 5)]
2024        """
2025        # note: here 'True' is just a placeholder
2026        rdd = other.map(lambda x: (x, True))
2027        return self.map(lambda x: (x, True)).subtractByKey(rdd, numPartitions).keys()
2028
2029    def keyBy(self, f):
2030        """
2031        Creates tuples of the elements in this RDD by applying C{f}.
2032
2033        >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
2034        >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
2035        >>> [(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]
2036        [(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])]
2037        """
2038        return self.map(lambda x: (f(x), x))
2039
2040    def repartition(self, numPartitions):
2041        """
2042         Return a new RDD that has exactly numPartitions partitions.
2043
2044         Can increase or decrease the level of parallelism in this RDD.
2045         Internally, this uses a shuffle to redistribute data.
2046         If you are decreasing the number of partitions in this RDD, consider
2047         using `coalesce`, which can avoid performing a shuffle.
2048
2049         >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
2050         >>> sorted(rdd.glom().collect())
2051         [[1], [2, 3], [4, 5], [6, 7]]
2052         >>> len(rdd.repartition(2).glom().collect())
2053         2
2054         >>> len(rdd.repartition(10).glom().collect())
2055         10
2056        """
2057        return self.coalesce(numPartitions, shuffle=True)
2058
2059    def coalesce(self, numPartitions, shuffle=False):
2060        """
2061        Return a new RDD that is reduced into `numPartitions` partitions.
2062
2063        >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
2064        [[1], [2, 3], [4, 5]]
2065        >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
2066        [[1, 2, 3, 4, 5]]
2067        """
2068        if shuffle:
2069            # Decrease the batch size in order to distribute evenly the elements across output
2070            # partitions. Otherwise, repartition will possibly produce highly skewed partitions.
2071            batchSize = min(10, self.ctx._batchSize or 1024)
2072            ser = BatchedSerializer(PickleSerializer(), batchSize)
2073            selfCopy = self._reserialize(ser)
2074            jrdd_deserializer = selfCopy._jrdd_deserializer
2075            jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle)
2076        else:
2077            jrdd_deserializer = self._jrdd_deserializer
2078            jrdd = self._jrdd.coalesce(numPartitions, shuffle)
2079        return RDD(jrdd, self.ctx, jrdd_deserializer)
2080
2081    def zip(self, other):
2082        """
2083        Zips this RDD with another one, returning key-value pairs with the
2084        first element in each RDD second element in each RDD, etc. Assumes
2085        that the two RDDs have the same number of partitions and the same
2086        number of elements in each partition (e.g. one was made through
2087        a map on the other).
2088
2089        >>> x = sc.parallelize(range(0,5))
2090        >>> y = sc.parallelize(range(1000, 1005))
2091        >>> x.zip(y).collect()
2092        [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
2093        """
2094        def get_batch_size(ser):
2095            if isinstance(ser, BatchedSerializer):
2096                return ser.batchSize
2097            return 1  # not batched
2098
2099        def batch_as(rdd, batchSize):
2100            return rdd._reserialize(BatchedSerializer(PickleSerializer(), batchSize))
2101
2102        my_batch = get_batch_size(self._jrdd_deserializer)
2103        other_batch = get_batch_size(other._jrdd_deserializer)
2104        if my_batch != other_batch or not my_batch:
2105            # use the smallest batchSize for both of them
2106            batchSize = min(my_batch, other_batch)
2107            if batchSize <= 0:
2108                # auto batched or unlimited
2109                batchSize = 100
2110            other = batch_as(other, batchSize)
2111            self = batch_as(self, batchSize)
2112
2113        if self.getNumPartitions() != other.getNumPartitions():
2114            raise ValueError("Can only zip with RDD which has the same number of partitions")
2115
2116        # There will be an Exception in JVM if there are different number
2117        # of items in each partitions.
2118        pairRDD = self._jrdd.zip(other._jrdd)
2119        deserializer = PairDeserializer(self._jrdd_deserializer,
2120                                        other._jrdd_deserializer)
2121        return RDD(pairRDD, self.ctx, deserializer)
2122
2123    def zipWithIndex(self):
2124        """
2125        Zips this RDD with its element indices.
2126
2127        The ordering is first based on the partition index and then the
2128        ordering of items within each partition. So the first item in
2129        the first partition gets index 0, and the last item in the last
2130        partition receives the largest index.
2131
2132        This method needs to trigger a spark job when this RDD contains
2133        more than one partitions.
2134
2135        >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
2136        [('a', 0), ('b', 1), ('c', 2), ('d', 3)]
2137        """
2138        starts = [0]
2139        if self.getNumPartitions() > 1:
2140            nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect()
2141            for i in range(len(nums) - 1):
2142                starts.append(starts[-1] + nums[i])
2143
2144        def func(k, it):
2145            for i, v in enumerate(it, starts[k]):
2146                yield v, i
2147
2148        return self.mapPartitionsWithIndex(func)
2149
2150    def zipWithUniqueId(self):
2151        """
2152        Zips this RDD with generated unique Long ids.
2153
2154        Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
2155        n is the number of partitions. So there may exist gaps, but this
2156        method won't trigger a spark job, which is different from
2157        L{zipWithIndex}
2158
2159        >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect()
2160        [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]
2161        """
2162        n = self.getNumPartitions()
2163
2164        def func(k, it):
2165            for i, v in enumerate(it):
2166                yield v, i * n + k
2167
2168        return self.mapPartitionsWithIndex(func)
2169
2170    def name(self):
2171        """
2172        Return the name of this RDD.
2173        """
2174        n = self._jrdd.name()
2175        if n:
2176            return n
2177
2178    @ignore_unicode_prefix
2179    def setName(self, name):
2180        """
2181        Assign a name to this RDD.
2182
2183        >>> rdd1 = sc.parallelize([1, 2])
2184        >>> rdd1.setName('RDD1').name()
2185        u'RDD1'
2186        """
2187        self._jrdd.setName(name)
2188        return self
2189
2190    def toDebugString(self):
2191        """
2192        A description of this RDD and its recursive dependencies for debugging.
2193        """
2194        debug_string = self._jrdd.toDebugString()
2195        if debug_string:
2196            return debug_string.encode('utf-8')
2197
2198    def getStorageLevel(self):
2199        """
2200        Get the RDD's current storage level.
2201
2202        >>> rdd1 = sc.parallelize([1,2])
2203        >>> rdd1.getStorageLevel()
2204        StorageLevel(False, False, False, False, 1)
2205        >>> print(rdd1.getStorageLevel())
2206        Serialized 1x Replicated
2207        """
2208        java_storage_level = self._jrdd.getStorageLevel()
2209        storage_level = StorageLevel(java_storage_level.useDisk(),
2210                                     java_storage_level.useMemory(),
2211                                     java_storage_level.useOffHeap(),
2212                                     java_storage_level.deserialized(),
2213                                     java_storage_level.replication())
2214        return storage_level
2215
2216    def _defaultReducePartitions(self):
2217        """
2218        Returns the default number of partitions to use during reduce tasks (e.g., groupBy).
2219        If spark.default.parallelism is set, then we'll use the value from SparkContext
2220        defaultParallelism, otherwise we'll use the number of partitions in this RDD.
2221
2222        This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce
2223        the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will
2224        be inherent.
2225        """
2226        if self.ctx._conf.contains("spark.default.parallelism"):
2227            return self.ctx.defaultParallelism
2228        else:
2229            return self.getNumPartitions()
2230
2231    def lookup(self, key):
2232        """
2233        Return the list of values in the RDD for key `key`. This operation
2234        is done efficiently if the RDD has a known partitioner by only
2235        searching the partition that the key maps to.
2236
2237        >>> l = range(1000)
2238        >>> rdd = sc.parallelize(zip(l, l), 10)
2239        >>> rdd.lookup(42)  # slow
2240        [42]
2241        >>> sorted = rdd.sortByKey()
2242        >>> sorted.lookup(42)  # fast
2243        [42]
2244        >>> sorted.lookup(1024)
2245        []
2246        >>> rdd2 = sc.parallelize([(('a', 'b'), 'c')]).groupByKey()
2247        >>> list(rdd2.lookup(('a', 'b'))[0])
2248        ['c']
2249        """
2250        values = self.filter(lambda kv: kv[0] == key).values()
2251
2252        if self.partitioner is not None:
2253            return self.ctx.runJob(values, lambda x: x, [self.partitioner(key)])
2254
2255        return values.collect()
2256
2257    def _to_java_object_rdd(self):
2258        """ Return a JavaRDD of Object by unpickling
2259
2260        It will convert each Python object into Java object by Pyrolite, whenever the
2261        RDD is serialized in batch or not.
2262        """
2263        rdd = self._pickled()
2264        return self.ctx._jvm.SerDeUtil.pythonToJava(rdd._jrdd, True)
2265
2266    def countApprox(self, timeout, confidence=0.95):
2267        """
2268        .. note:: Experimental
2269
2270        Approximate version of count() that returns a potentially incomplete
2271        result within a timeout, even if not all tasks have finished.
2272
2273        >>> rdd = sc.parallelize(range(1000), 10)
2274        >>> rdd.countApprox(1000, 1.0)
2275        1000
2276        """
2277        drdd = self.mapPartitions(lambda it: [float(sum(1 for i in it))])
2278        return int(drdd.sumApprox(timeout, confidence))
2279
2280    def sumApprox(self, timeout, confidence=0.95):
2281        """
2282        .. note:: Experimental
2283
2284        Approximate operation to return the sum within a timeout
2285        or meet the confidence.
2286
2287        >>> rdd = sc.parallelize(range(1000), 10)
2288        >>> r = sum(range(1000))
2289        >>> abs(rdd.sumApprox(1000) - r) / r < 0.05
2290        True
2291        """
2292        jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_java_object_rdd()
2293        jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
2294        r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
2295        return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
2296
2297    def meanApprox(self, timeout, confidence=0.95):
2298        """
2299        .. note:: Experimental
2300
2301        Approximate operation to return the mean within a timeout
2302        or meet the confidence.
2303
2304        >>> rdd = sc.parallelize(range(1000), 10)
2305        >>> r = sum(range(1000)) / 1000.0
2306        >>> abs(rdd.meanApprox(1000) - r) / r < 0.05
2307        True
2308        """
2309        jrdd = self.map(float)._to_java_object_rdd()
2310        jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
2311        r = jdrdd.meanApprox(timeout, confidence).getFinalValue()
2312        return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
2313
2314    def countApproxDistinct(self, relativeSD=0.05):
2315        """
2316        .. note:: Experimental
2317
2318        Return approximate number of distinct elements in the RDD.
2319
2320        The algorithm used is based on streamlib's implementation of
2321        `"HyperLogLog in Practice: Algorithmic Engineering of a State
2322        of The Art Cardinality Estimation Algorithm", available here
2323        <http://dx.doi.org/10.1145/2452376.2452456>`_.
2324
2325        :param relativeSD: Relative accuracy. Smaller values create
2326                           counters that require more space.
2327                           It must be greater than 0.000017.
2328
2329        >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
2330        >>> 900 < n < 1100
2331        True
2332        >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
2333        >>> 16 < n < 24
2334        True
2335        """
2336        if relativeSD < 0.000017:
2337            raise ValueError("relativeSD should be greater than 0.000017")
2338        # the hash space in Java is 2^32
2339        hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF)
2340        return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD)
2341
2342    def toLocalIterator(self):
2343        """
2344        Return an iterator that contains all of the elements in this RDD.
2345        The iterator will consume as much memory as the largest partition in this RDD.
2346
2347        >>> rdd = sc.parallelize(range(10))
2348        >>> [x for x in rdd.toLocalIterator()]
2349        [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2350        """
2351        with SCCallSiteSync(self.context) as css:
2352            port = self.ctx._jvm.PythonRDD.toLocalIteratorAndServe(self._jrdd.rdd())
2353        return _load_from_socket(port, self._jrdd_deserializer)
2354
2355
2356def _prepare_for_python_RDD(sc, command):
2357    # the serialized command will be compressed by broadcast
2358    ser = CloudPickleSerializer()
2359    pickled_command = ser.dumps(command)
2360    if len(pickled_command) > (1 << 20):  # 1M
2361        # The broadcast will have same life cycle as created PythonRDD
2362        broadcast = sc.broadcast(pickled_command)
2363        pickled_command = ser.dumps(broadcast)
2364    broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars]
2365    sc._pickled_broadcast_vars.clear()
2366    return pickled_command, broadcast_vars, sc.environment, sc._python_includes
2367
2368
2369def _wrap_function(sc, func, deserializer, serializer, profiler=None):
2370    assert deserializer, "deserializer should not be empty"
2371    assert serializer, "serializer should not be empty"
2372    command = (func, profiler, deserializer, serializer)
2373    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
2374    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
2375                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
2376
2377
2378class PipelinedRDD(RDD):
2379
2380    """
2381    Pipelined maps:
2382
2383    >>> rdd = sc.parallelize([1, 2, 3, 4])
2384    >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
2385    [4, 8, 12, 16]
2386    >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
2387    [4, 8, 12, 16]
2388
2389    Pipelined reduces:
2390    >>> from operator import add
2391    >>> rdd.map(lambda x: 2 * x).reduce(add)
2392    20
2393    >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
2394    20
2395    """
2396
2397    def __init__(self, prev, func, preservesPartitioning=False):
2398        if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
2399            # This transformation is the first in its stage:
2400            self.func = func
2401            self.preservesPartitioning = preservesPartitioning
2402            self._prev_jrdd = prev._jrdd
2403            self._prev_jrdd_deserializer = prev._jrdd_deserializer
2404        else:
2405            prev_func = prev.func
2406
2407            def pipeline_func(split, iterator):
2408                return func(split, prev_func(split, iterator))
2409            self.func = pipeline_func
2410            self.preservesPartitioning = \
2411                prev.preservesPartitioning and preservesPartitioning
2412            self._prev_jrdd = prev._prev_jrdd  # maintain the pipeline
2413            self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
2414        self.is_cached = False
2415        self.is_checkpointed = False
2416        self.ctx = prev.ctx
2417        self.prev = prev
2418        self._jrdd_val = None
2419        self._id = None
2420        self._jrdd_deserializer = self.ctx.serializer
2421        self._bypass_serializer = False
2422        self.partitioner = prev.partitioner if self.preservesPartitioning else None
2423
2424    def getNumPartitions(self):
2425        return self._prev_jrdd.partitions().size()
2426
2427    @property
2428    def _jrdd(self):
2429        if self._jrdd_val:
2430            return self._jrdd_val
2431        if self._bypass_serializer:
2432            self._jrdd_deserializer = NoOpSerializer()
2433
2434        if self.ctx.profiler_collector:
2435            profiler = self.ctx.profiler_collector.new_profiler(self.ctx)
2436        else:
2437            profiler = None
2438
2439        wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
2440                                      self._jrdd_deserializer, profiler)
2441        python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
2442                                             self.preservesPartitioning)
2443        self._jrdd_val = python_rdd.asJavaRDD()
2444
2445        if profiler:
2446            self._id = self._jrdd_val.id()
2447            self.ctx.profiler_collector.add_profiler(self._id, profiler)
2448        return self._jrdd_val
2449
2450    def id(self):
2451        if self._id is None:
2452            self._id = self._jrdd.id()
2453        return self._id
2454
2455    def _is_pipelinable(self):
2456        return not (self.is_cached or self.is_checkpointed)
2457
2458
2459def _test():
2460    import doctest
2461    from pyspark.context import SparkContext
2462    globs = globals().copy()
2463    # The small batch size here ensures that we see multiple batches,
2464    # even in these small test examples:
2465    globs['sc'] = SparkContext('local[4]', 'PythonTest')
2466    (failure_count, test_count) = doctest.testmod(
2467        globs=globs, optionflags=doctest.ELLIPSIS)
2468    globs['sc'].stop()
2469    if failure_count:
2470        exit(-1)
2471
2472
2473if __name__ == "__main__":
2474    _test()
2475