1#
2# Licensed to the Apache Software Foundation (ASF) under one or more
3# contributor license agreements.  See the NOTICE file distributed with
4# this work for additional information regarding copyright ownership.
5# The ASF licenses this file to You under the Apache License, Version 2.0
6# (the "License"); you may not use this file except in compliance with
7# the License.  You may obtain a copy of the License at
8#
9#    http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import sys
19import json
20
21if sys.version >= '3':
22    intlike = int
23    basestring = unicode = str
24else:
25    intlike = (int, long)
26
27from abc import ABCMeta, abstractmethod
28
29from pyspark import since, keyword_only
30from pyspark.rdd import ignore_unicode_prefix
31from pyspark.sql.column import _to_seq
32from pyspark.sql.readwriter import OptionUtils, to_str
33from pyspark.sql.types import *
34from pyspark.sql.utils import StreamingQueryException
35
36__all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"]
37
38
39class StreamingQuery(object):
40    """
41    A handle to a query that is executing continuously in the background as new data arrives.
42    All these methods are thread-safe.
43
44    .. note:: Experimental
45
46    .. versionadded:: 2.0
47    """
48
49    def __init__(self, jsq):
50        self._jsq = jsq
51
52    @property
53    @since(2.0)
54    def id(self):
55        """Returns the unique id of this query that persists across restarts from checkpoint data.
56        That is, this id is generated when a query is started for the first time, and
57        will be the same every time it is restarted from checkpoint data.
58        There can only be one query with the same id active in a Spark cluster.
59        Also see, `runId`.
60        """
61        return self._jsq.id().toString()
62
63    @property
64    @since(2.1)
65    def runId(self):
66        """Returns the unique id of this query that does not persist across restarts. That is, every
67        query that is started (or restarted from checkpoint) will have a different runId.
68        """
69        return self._jsq.runId().toString()
70
71    @property
72    @since(2.0)
73    def name(self):
74        """Returns the user-specified name of the query, or null if not specified.
75        This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`
76        as `dataframe.writeStream.queryName("query").start()`.
77        This name, if set, must be unique across all active queries.
78        """
79        return self._jsq.name()
80
81    @property
82    @since(2.0)
83    def isActive(self):
84        """Whether this streaming query is currently active or not.
85        """
86        return self._jsq.isActive()
87
88    @since(2.0)
89    def awaitTermination(self, timeout=None):
90        """Waits for the termination of `this` query, either by :func:`query.stop()` or by an
91        exception. If the query has terminated with an exception, then the exception will be thrown.
92        If `timeout` is set, it returns whether the query has terminated or not within the
93        `timeout` seconds.
94
95        If the query has terminated, then all subsequent calls to this method will either return
96        immediately (if the query was terminated by :func:`stop()`), or throw the exception
97        immediately (if the query has terminated with exception).
98
99        throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
100        """
101        if timeout is not None:
102            if not isinstance(timeout, (int, float)) or timeout < 0:
103                raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
104            return self._jsq.awaitTermination(int(timeout * 1000))
105        else:
106            return self._jsq.awaitTermination()
107
108    @property
109    @since(2.1)
110    def status(self):
111        """
112        Returns the current status of the query.
113        """
114        return json.loads(self._jsq.status().json())
115
116    @property
117    @since(2.1)
118    def recentProgress(self):
119        """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
120        The number of progress updates retained for each stream is configured by Spark session
121        configuration `spark.sql.streaming.numRecentProgressUpdates`.
122        """
123        return [json.loads(p.json()) for p in self._jsq.recentProgress()]
124
125    @property
126    @since(2.1)
127    def lastProgress(self):
128        """
129        Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or
130        None if there were no progress updates
131        :return: a map
132        """
133        lastProgress = self._jsq.lastProgress()
134        if lastProgress:
135            return json.loads(lastProgress.json())
136        else:
137            return None
138
139    @since(2.0)
140    def processAllAvailable(self):
141        """Blocks until all available data in the source has been processed and committed to the
142        sink. This method is intended for testing.
143
144        .. note:: In the case of continually arriving data, this method may block forever.
145            Additionally, this method is only guaranteed to block until data that has been
146            synchronously appended data to a stream source prior to invocation.
147            (i.e. `getOffset` must immediately reflect the addition).
148        """
149        return self._jsq.processAllAvailable()
150
151    @since(2.0)
152    def stop(self):
153        """Stop this streaming query.
154        """
155        self._jsq.stop()
156
157    @since(2.1)
158    def explain(self, extended=False):
159        """Prints the (logical and physical) plans to the console for debugging purpose.
160
161        :param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
162
163        >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
164        >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
165        >>> sq.explain()
166        == Physical Plan ==
167        ...
168        >>> sq.explain(True)
169        == Parsed Logical Plan ==
170        ...
171        == Analyzed Logical Plan ==
172        ...
173        == Optimized Logical Plan ==
174        ...
175        == Physical Plan ==
176        ...
177        >>> sq.stop()
178        """
179        # Cannot call `_jsq.explain(...)` because it will print in the JVM process.
180        # We should print it in the Python process.
181        print(self._jsq.explainInternal(extended))
182
183    @since(2.1)
184    def exception(self):
185        """
186        :return: the StreamingQueryException if the query was terminated by an exception, or None.
187        """
188        if self._jsq.exception().isDefined():
189            je = self._jsq.exception().get()
190            msg = je.toString().split(': ', 1)[1]  # Drop the Java StreamingQueryException type info
191            stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace()))
192            return StreamingQueryException(msg, stackTrace)
193        else:
194            return None
195
196
197class StreamingQueryManager(object):
198    """A class to manage all the :class:`StreamingQuery` StreamingQueries active.
199
200    .. note:: Experimental
201
202    .. versionadded:: 2.0
203    """
204
205    def __init__(self, jsqm):
206        self._jsqm = jsqm
207
208    @property
209    @ignore_unicode_prefix
210    @since(2.0)
211    def active(self):
212        """Returns a list of active queries associated with this SQLContext
213
214        >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
215        >>> sqm = spark.streams
216        >>> # get the list of active streaming queries
217        >>> [q.name for q in sqm.active]
218        [u'this_query']
219        >>> sq.stop()
220        """
221        return [StreamingQuery(jsq) for jsq in self._jsqm.active()]
222
223    @ignore_unicode_prefix
224    @since(2.0)
225    def get(self, id):
226        """Returns an active query from this SQLContext or throws exception if an active query
227        with this name doesn't exist.
228
229        >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
230        >>> sq.name
231        u'this_query'
232        >>> sq = spark.streams.get(sq.id)
233        >>> sq.isActive
234        True
235        >>> sq = sqlContext.streams.get(sq.id)
236        >>> sq.isActive
237        True
238        >>> sq.stop()
239        """
240        return StreamingQuery(self._jsqm.get(id))
241
242    @since(2.0)
243    def awaitAnyTermination(self, timeout=None):
244        """Wait until any of the queries on the associated SQLContext has terminated since the
245        creation of the context, or since :func:`resetTerminated()` was called. If any query was
246        terminated with an exception, then the exception will be thrown.
247        If `timeout` is set, it returns whether the query has terminated or not within the
248        `timeout` seconds.
249
250        If a query has terminated, then subsequent calls to :func:`awaitAnyTermination()` will
251        either return immediately (if the query was terminated by :func:`query.stop()`),
252        or throw the exception immediately (if the query was terminated with exception). Use
253        :func:`resetTerminated()` to clear past terminations and wait for new terminations.
254
255        In the case where multiple queries have terminated since :func:`resetTermination()`
256        was called, if any query has terminated with exception, then :func:`awaitAnyTermination()`
257        will throw any of the exception. For correctly documenting exceptions across multiple
258        queries, users need to stop all of them after any of them terminates with exception, and
259        then check the `query.exception()` for each query.
260
261        throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
262        """
263        if timeout is not None:
264            if not isinstance(timeout, (int, float)) or timeout < 0:
265                raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
266            return self._jsqm.awaitAnyTermination(int(timeout * 1000))
267        else:
268            return self._jsqm.awaitAnyTermination()
269
270    @since(2.0)
271    def resetTerminated(self):
272        """Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used
273        again to wait for new terminations.
274
275        >>> spark.streams.resetTerminated()
276        """
277        self._jsqm.resetTerminated()
278
279
280class Trigger(object):
281    """Used to indicate how often results should be produced by a :class:`StreamingQuery`.
282
283    .. note:: Experimental
284
285    .. versionadded:: 2.0
286    """
287
288    __metaclass__ = ABCMeta
289
290    @abstractmethod
291    def _to_java_trigger(self, sqlContext):
292        """Internal method to construct the trigger on the jvm.
293        """
294        pass
295
296
297class ProcessingTime(Trigger):
298    """A trigger that runs a query periodically based on the processing time. If `interval` is 0,
299    the query will run as fast as possible.
300
301    The interval should be given as a string, e.g. '2 seconds', '5 minutes', ...
302
303    .. note:: Experimental
304
305    .. versionadded:: 2.0
306    """
307
308    def __init__(self, interval):
309        if type(interval) != str or len(interval.strip()) == 0:
310            raise ValueError("interval should be a non empty interval string, e.g. '2 seconds'.")
311        self.interval = interval
312
313    def _to_java_trigger(self, sqlContext):
314        return sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create(
315            self.interval)
316
317
318class DataStreamReader(OptionUtils):
319    """
320    Interface used to load a streaming :class:`DataFrame` from external storage systems
321    (e.g. file systems, key-value stores, etc). Use :func:`spark.readStream`
322    to access this.
323
324    .. note:: Experimental.
325
326    .. versionadded:: 2.0
327    """
328
329    def __init__(self, spark):
330        self._jreader = spark._ssql_ctx.readStream()
331        self._spark = spark
332
333    def _df(self, jdf):
334        from pyspark.sql.dataframe import DataFrame
335        return DataFrame(jdf, self._spark)
336
337    @since(2.0)
338    def format(self, source):
339        """Specifies the input data source format.
340
341        .. note:: Experimental.
342
343        :param source: string, name of the data source, e.g. 'json', 'parquet'.
344
345        >>> s = spark.readStream.format("text")
346        """
347        self._jreader = self._jreader.format(source)
348        return self
349
350    @since(2.0)
351    def schema(self, schema):
352        """Specifies the input schema.
353
354        Some data sources (e.g. JSON) can infer the input schema automatically from data.
355        By specifying the schema here, the underlying data source can skip the schema
356        inference step, and thus speed up data loading.
357
358        .. note:: Experimental.
359
360        :param schema: a :class:`pyspark.sql.types.StructType` object
361
362        >>> s = spark.readStream.schema(sdf_schema)
363        """
364        from pyspark.sql import SparkSession
365        if not isinstance(schema, StructType):
366            raise TypeError("schema should be StructType")
367        spark = SparkSession.builder.getOrCreate()
368        jschema = spark._jsparkSession.parseDataType(schema.json())
369        self._jreader = self._jreader.schema(jschema)
370        return self
371
372    @since(2.0)
373    def option(self, key, value):
374        """Adds an input option for the underlying data source.
375
376        .. note:: Experimental.
377
378        >>> s = spark.readStream.option("x", 1)
379        """
380        self._jreader = self._jreader.option(key, to_str(value))
381        return self
382
383    @since(2.0)
384    def options(self, **options):
385        """Adds input options for the underlying data source.
386
387        .. note:: Experimental.
388
389        >>> s = spark.readStream.options(x="1", y=2)
390        """
391        for k in options:
392            self._jreader = self._jreader.option(k, to_str(options[k]))
393        return self
394
395    @since(2.0)
396    def load(self, path=None, format=None, schema=None, **options):
397        """Loads a data stream from a data source and returns it as a :class`DataFrame`.
398
399        .. note:: Experimental.
400
401        :param path: optional string for file-system backed data sources.
402        :param format: optional string for format of the data source. Default to 'parquet'.
403        :param schema: optional :class:`pyspark.sql.types.StructType` for the input schema.
404        :param options: all other string options
405
406        >>> json_sdf = spark.readStream.format("json") \\
407        ...     .schema(sdf_schema) \\
408        ...     .load(tempfile.mkdtemp())
409        >>> json_sdf.isStreaming
410        True
411        >>> json_sdf.schema == sdf_schema
412        True
413        """
414        if format is not None:
415            self.format(format)
416        if schema is not None:
417            self.schema(schema)
418        self.options(**options)
419        if path is not None:
420            if type(path) != str or len(path.strip()) == 0:
421                raise ValueError("If the path is provided for stream, it needs to be a " +
422                                 "non-empty string. List of paths are not supported.")
423            return self._df(self._jreader.load(path))
424        else:
425            return self._df(self._jreader.load())
426
427    @since(2.0)
428    def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
429             allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
430             allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
431             mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
432             timestampFormat=None):
433        """
434        Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON
435        <http://jsonlines.org/>`_) and returns a :class`DataFrame`.
436
437        If the ``schema`` parameter is not specified, this function goes
438        through the input once to determine the input schema.
439
440        .. note:: Experimental.
441
442        :param path: string represents path to the JSON dataset,
443                     or RDD of Strings storing JSON objects.
444        :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
445        :param primitivesAsString: infers all primitive values as a string type. If None is set,
446                                   it uses the default value, ``false``.
447        :param prefersDecimal: infers all floating-point values as a decimal type. If the values
448                               do not fit in decimal, then it infers them as doubles. If None is
449                               set, it uses the default value, ``false``.
450        :param allowComments: ignores Java/C++ style comment in JSON records. If None is set,
451                              it uses the default value, ``false``.
452        :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set,
453                                        it uses the default value, ``false``.
454        :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is
455                                        set, it uses the default value, ``true``.
456        :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is
457                                        set, it uses the default value, ``false``.
458        :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character
459                                                   using backslash quoting mechanism. If None is
460                                                   set, it uses the default value, ``false``.
461        :param mode: allows a mode for dealing with corrupt records during parsing. If None is
462                     set, it uses the default value, ``PERMISSIVE``.
463
464                *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
465                  record and puts the malformed string into a new field configured by \
466                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
467                 ``null`` for extra fields.
468                *  ``DROPMALFORMED`` : ignores the whole corrupted records.
469                *  ``FAILFAST`` : throws an exception when it meets corrupted records.
470
471        :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
472                                          created by ``PERMISSIVE`` mode. This overrides
473                                          ``spark.sql.columnNameOfCorruptRecord``. If None is set,
474                                          it uses the value specified in
475                                          ``spark.sql.columnNameOfCorruptRecord``.
476        :param dateFormat: sets the string that indicates a date format. Custom date formats
477                           follow the formats at ``java.text.SimpleDateFormat``. This
478                           applies to date type. If None is set, it uses the
479                           default value value, ``yyyy-MM-dd``.
480        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
481                                formats follow the formats at ``java.text.SimpleDateFormat``.
482                                This applies to timestamp type. If None is set, it uses the
483                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
484
485        >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
486        >>> json_sdf.isStreaming
487        True
488        >>> json_sdf.schema == sdf_schema
489        True
490        """
491        self._set_opts(
492            schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
493            allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
494            allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
495            allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
496            mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
497            timestampFormat=timestampFormat)
498        if isinstance(path, basestring):
499            return self._df(self._jreader.json(path))
500        else:
501            raise TypeError("path can be only a single string")
502
503    @since(2.0)
504    def parquet(self, path):
505        """Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
506
507        You can set the following Parquet-specific option(s) for reading Parquet files:
508            * ``mergeSchema``: sets whether we should merge schemas collected from all \
509                Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
510                The default value is specified in ``spark.sql.parquet.mergeSchema``.
511
512        .. note:: Experimental.
513
514        >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())
515        >>> parquet_sdf.isStreaming
516        True
517        >>> parquet_sdf.schema == sdf_schema
518        True
519        """
520        if isinstance(path, basestring):
521            return self._df(self._jreader.parquet(path))
522        else:
523            raise TypeError("path can be only a single string")
524
525    @ignore_unicode_prefix
526    @since(2.0)
527    def text(self, path):
528        """
529        Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
530        string column named "value", and followed by partitioned columns if there
531        are any.
532
533        Each line in the text file is a new row in the resulting DataFrame.
534
535        .. note:: Experimental.
536
537        :param paths: string, or list of strings, for input path(s).
538
539        >>> text_sdf = spark.readStream.text(tempfile.mkdtemp())
540        >>> text_sdf.isStreaming
541        True
542        >>> "value" in str(text_sdf.schema)
543        True
544        """
545        if isinstance(path, basestring):
546            return self._df(self._jreader.text(path))
547        else:
548            raise TypeError("path can be only a single string")
549
550    @since(2.0)
551    def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
552            comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
553            ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
554            negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
555            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
556        """Loads a CSV file stream and returns the result as a  :class:`DataFrame`.
557
558        This function will go through the input once to determine the input schema if
559        ``inferSchema`` is enabled. To avoid going through the entire data once, disable
560        ``inferSchema`` option or specify the schema explicitly using ``schema``.
561
562        .. note:: Experimental.
563
564        :param path: string, or list of strings, for input path(s).
565        :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
566        :param sep: sets the single character as a separator for each field and value.
567                    If None is set, it uses the default value, ``,``.
568        :param encoding: decodes the CSV files by the given encoding type. If None is set,
569                         it uses the default value, ``UTF-8``.
570        :param quote: sets the single character used for escaping quoted values where the
571                      separator can be part of the value. If None is set, it uses the default
572                      value, ``"``. If you would like to turn off quotations, you need to set an
573                      empty string.
574        :param escape: sets the single character used for escaping quotes inside an already
575                       quoted value. If None is set, it uses the default value, ``\``.
576        :param comment: sets the single character used for skipping lines beginning with this
577                        character. By default (None), it is disabled.
578        :param header: uses the first line as names of columns. If None is set, it uses the
579                       default value, ``false``.
580        :param inferSchema: infers the input schema automatically from data. It requires one extra
581                       pass over the data. If None is set, it uses the default value, ``false``.
582        :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values
583                                        being read should be skipped. If None is set, it uses
584                                        the default value, ``false``.
585        :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values
586                                         being read should be skipped. If None is set, it uses
587                                         the default value, ``false``.
588        :param nullValue: sets the string representation of a null value. If None is set, it uses
589                          the default value, empty string. Since 2.0.1, this ``nullValue`` param
590                          applies to all supported types including the string type.
591        :param nanValue: sets the string representation of a non-number value. If None is set, it
592                         uses the default value, ``NaN``.
593        :param positiveInf: sets the string representation of a positive infinity value. If None
594                            is set, it uses the default value, ``Inf``.
595        :param negativeInf: sets the string representation of a negative infinity value. If None
596                            is set, it uses the default value, ``Inf``.
597        :param dateFormat: sets the string that indicates a date format. Custom date formats
598                           follow the formats at ``java.text.SimpleDateFormat``. This
599                           applies to date type. If None is set, it uses the
600                           default value value, ``yyyy-MM-dd``.
601        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
602                                formats follow the formats at ``java.text.SimpleDateFormat``.
603                                This applies to timestamp type. If None is set, it uses the
604                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
605        :param maxColumns: defines a hard limit of how many columns a record can have. If None is
606                           set, it uses the default value, ``20480``.
607        :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
608                                  value being read. If None is set, it uses the default value,
609                                  ``-1`` meaning unlimited length.
610        :param mode: allows a mode for dealing with corrupt records during parsing. If None is
611                     set, it uses the default value, ``PERMISSIVE``.
612
613                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
614                    When a schema is set by user, it sets ``null`` for extra fields.
615                * ``DROPMALFORMED`` : ignores the whole corrupted records.
616                * ``FAILFAST`` : throws an exception when it meets corrupted records.
617
618        >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
619        >>> csv_sdf.isStreaming
620        True
621        >>> csv_sdf.schema == sdf_schema
622        True
623        """
624        self._set_opts(
625            schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
626            header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
627            ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
628            nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
629            dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
630            maxCharsPerColumn=maxCharsPerColumn,
631            maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
632        if isinstance(path, basestring):
633            return self._df(self._jreader.csv(path))
634        else:
635            raise TypeError("path can be only a single string")
636
637
638class DataStreamWriter(object):
639    """
640    Interface used to write a streaming :class:`DataFrame` to external storage systems
641    (e.g. file systems, key-value stores, etc). Use :func:`DataFrame.writeStream`
642    to access this.
643
644    .. note:: Experimental.
645
646    .. versionadded:: 2.0
647    """
648
649    def __init__(self, df):
650        self._df = df
651        self._spark = df.sql_ctx
652        self._jwrite = df._jdf.writeStream()
653
654    def _sq(self, jsq):
655        from pyspark.sql.streaming import StreamingQuery
656        return StreamingQuery(jsq)
657
658    @since(2.0)
659    def outputMode(self, outputMode):
660        """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
661
662        Options include:
663
664        * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to
665           the sink
666        * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
667           every time these is some updates
668        * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be
669           written to the sink every time there are some updates. If the query doesn't contain
670           aggregations, it will be equivalent to `append` mode.
671
672       .. note:: Experimental.
673
674        >>> writer = sdf.writeStream.outputMode('append')
675        """
676        if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0:
677            raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode)
678        self._jwrite = self._jwrite.outputMode(outputMode)
679        return self
680
681    @since(2.0)
682    def format(self, source):
683        """Specifies the underlying output data source.
684
685        .. note:: Experimental.
686
687        :param source: string, name of the data source, which for now can be 'parquet'.
688
689        >>> writer = sdf.writeStream.format('json')
690        """
691        self._jwrite = self._jwrite.format(source)
692        return self
693
694    @since(2.0)
695    def option(self, key, value):
696        """Adds an output option for the underlying data source.
697
698        .. note:: Experimental.
699        """
700        self._jwrite = self._jwrite.option(key, to_str(value))
701        return self
702
703    @since(2.0)
704    def options(self, **options):
705        """Adds output options for the underlying data source.
706
707       .. note:: Experimental.
708        """
709        for k in options:
710            self._jwrite = self._jwrite.option(k, to_str(options[k]))
711        return self
712
713    @since(2.0)
714    def partitionBy(self, *cols):
715        """Partitions the output by the given columns on the file system.
716
717        If specified, the output is laid out on the file system similar
718        to Hive's partitioning scheme.
719
720        .. note:: Experimental.
721
722        :param cols: name of columns
723
724        """
725        if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
726            cols = cols[0]
727        self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
728        return self
729
730    @since(2.0)
731    def queryName(self, queryName):
732        """Specifies the name of the :class:`StreamingQuery` that can be started with
733        :func:`start`. This name must be unique among all the currently active queries
734        in the associated SparkSession.
735
736        .. note:: Experimental.
737
738        :param queryName: unique name for the query
739
740        >>> writer = sdf.writeStream.queryName('streaming_query')
741        """
742        if not queryName or type(queryName) != str or len(queryName.strip()) == 0:
743            raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName)
744        self._jwrite = self._jwrite.queryName(queryName)
745        return self
746
747    @keyword_only
748    @since(2.0)
749    def trigger(self, processingTime=None):
750        """Set the trigger for the stream query. If this is not set it will run the query as fast
751        as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
752
753        .. note:: Experimental.
754
755        :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
756
757        >>> # trigger the query for execution every 5 seconds
758        >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
759        """
760        from pyspark.sql.streaming import ProcessingTime
761        trigger = None
762        if processingTime is not None:
763            if type(processingTime) != str or len(processingTime.strip()) == 0:
764                raise ValueError('The processing time must be a non empty string. Got: %s' %
765                                 processingTime)
766            trigger = ProcessingTime(processingTime)
767        if trigger is None:
768            raise ValueError('A trigger was not provided. Supported triggers: processingTime.')
769        self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark))
770        return self
771
772    @ignore_unicode_prefix
773    @since(2.0)
774    def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None,
775              **options):
776        """Streams the contents of the :class:`DataFrame` to a data source.
777
778        The data source is specified by the ``format`` and a set of ``options``.
779        If ``format`` is not specified, the default data source configured by
780        ``spark.sql.sources.default`` will be used.
781
782        .. note:: Experimental.
783
784        :param path: the path in a Hadoop supported file system
785        :param format: the format used to save
786        :param outputMode: specifies how data of a streaming DataFrame/Dataset is written to a
787                           streaming sink.
788
789            * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to the
790              sink
791            * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
792               every time these is some updates
793            * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be
794              written to the sink every time there are some updates. If the query doesn't contain
795              aggregations, it will be equivalent to `append` mode.
796        :param partitionBy: names of partitioning columns
797        :param queryName: unique name for the query
798        :param options: All other string options. You may want to provide a `checkpointLocation`
799                        for most streams, however it is not required for a `memory` stream.
800
801        >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
802        >>> sq.isActive
803        True
804        >>> sq.name
805        u'this_query'
806        >>> sq.stop()
807        >>> sq.isActive
808        False
809        >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
810        ...     queryName='that_query', outputMode="append", format='memory')
811        >>> sq.name
812        u'that_query'
813        >>> sq.isActive
814        True
815        >>> sq.stop()
816        """
817        self.options(**options)
818        if outputMode is not None:
819            self.outputMode(outputMode)
820        if partitionBy is not None:
821            self.partitionBy(partitionBy)
822        if format is not None:
823            self.format(format)
824        if queryName is not None:
825            self.queryName(queryName)
826        if path is None:
827            return self._sq(self._jwrite.start())
828        else:
829            return self._sq(self._jwrite.start(path))
830
831
832def _test():
833    import doctest
834    import os
835    import tempfile
836    from pyspark.sql import Row, SparkSession, SQLContext
837    import pyspark.sql.streaming
838
839    os.chdir(os.environ["SPARK_HOME"])
840
841    globs = pyspark.sql.streaming.__dict__.copy()
842    try:
843        spark = SparkSession.builder.getOrCreate()
844    except py4j.protocol.Py4JError:
845        spark = SparkSession(sc)
846
847    globs['tempfile'] = tempfile
848    globs['os'] = os
849    globs['spark'] = spark
850    globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext)
851    globs['sdf'] = \
852        spark.readStream.format('text').load('python/test_support/sql/streaming')
853    globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
854    globs['df'] = \
855        globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
856
857    (failure_count, test_count) = doctest.testmod(
858        pyspark.sql.streaming, globs=globs,
859        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
860    globs['spark'].stop()
861
862    if failure_count:
863        exit(-1)
864
865
866if __name__ == "__main__":
867    _test()
868