1# 2# Licensed to the Apache Software Foundation (ASF) under one or more 3# contributor license agreements. See the NOTICE file distributed with 4# this work for additional information regarding copyright ownership. 5# The ASF licenses this file to You under the Apache License, Version 2.0 6# (the "License"); you may not use this file except in compliance with 7# the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import sys 19import json 20 21if sys.version >= '3': 22 intlike = int 23 basestring = unicode = str 24else: 25 intlike = (int, long) 26 27from abc import ABCMeta, abstractmethod 28 29from pyspark import since, keyword_only 30from pyspark.rdd import ignore_unicode_prefix 31from pyspark.sql.column import _to_seq 32from pyspark.sql.readwriter import OptionUtils, to_str 33from pyspark.sql.types import * 34from pyspark.sql.utils import StreamingQueryException 35 36__all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"] 37 38 39class StreamingQuery(object): 40 """ 41 A handle to a query that is executing continuously in the background as new data arrives. 42 All these methods are thread-safe. 43 44 .. note:: Experimental 45 46 .. versionadded:: 2.0 47 """ 48 49 def __init__(self, jsq): 50 self._jsq = jsq 51 52 @property 53 @since(2.0) 54 def id(self): 55 """Returns the unique id of this query that persists across restarts from checkpoint data. 56 That is, this id is generated when a query is started for the first time, and 57 will be the same every time it is restarted from checkpoint data. 58 There can only be one query with the same id active in a Spark cluster. 59 Also see, `runId`. 60 """ 61 return self._jsq.id().toString() 62 63 @property 64 @since(2.1) 65 def runId(self): 66 """Returns the unique id of this query that does not persist across restarts. That is, every 67 query that is started (or restarted from checkpoint) will have a different runId. 68 """ 69 return self._jsq.runId().toString() 70 71 @property 72 @since(2.0) 73 def name(self): 74 """Returns the user-specified name of the query, or null if not specified. 75 This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter` 76 as `dataframe.writeStream.queryName("query").start()`. 77 This name, if set, must be unique across all active queries. 78 """ 79 return self._jsq.name() 80 81 @property 82 @since(2.0) 83 def isActive(self): 84 """Whether this streaming query is currently active or not. 85 """ 86 return self._jsq.isActive() 87 88 @since(2.0) 89 def awaitTermination(self, timeout=None): 90 """Waits for the termination of `this` query, either by :func:`query.stop()` or by an 91 exception. If the query has terminated with an exception, then the exception will be thrown. 92 If `timeout` is set, it returns whether the query has terminated or not within the 93 `timeout` seconds. 94 95 If the query has terminated, then all subsequent calls to this method will either return 96 immediately (if the query was terminated by :func:`stop()`), or throw the exception 97 immediately (if the query has terminated with exception). 98 99 throws :class:`StreamingQueryException`, if `this` query has terminated with an exception 100 """ 101 if timeout is not None: 102 if not isinstance(timeout, (int, float)) or timeout < 0: 103 raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) 104 return self._jsq.awaitTermination(int(timeout * 1000)) 105 else: 106 return self._jsq.awaitTermination() 107 108 @property 109 @since(2.1) 110 def status(self): 111 """ 112 Returns the current status of the query. 113 """ 114 return json.loads(self._jsq.status().json()) 115 116 @property 117 @since(2.1) 118 def recentProgress(self): 119 """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. 120 The number of progress updates retained for each stream is configured by Spark session 121 configuration `spark.sql.streaming.numRecentProgressUpdates`. 122 """ 123 return [json.loads(p.json()) for p in self._jsq.recentProgress()] 124 125 @property 126 @since(2.1) 127 def lastProgress(self): 128 """ 129 Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or 130 None if there were no progress updates 131 :return: a map 132 """ 133 lastProgress = self._jsq.lastProgress() 134 if lastProgress: 135 return json.loads(lastProgress.json()) 136 else: 137 return None 138 139 @since(2.0) 140 def processAllAvailable(self): 141 """Blocks until all available data in the source has been processed and committed to the 142 sink. This method is intended for testing. 143 144 .. note:: In the case of continually arriving data, this method may block forever. 145 Additionally, this method is only guaranteed to block until data that has been 146 synchronously appended data to a stream source prior to invocation. 147 (i.e. `getOffset` must immediately reflect the addition). 148 """ 149 return self._jsq.processAllAvailable() 150 151 @since(2.0) 152 def stop(self): 153 """Stop this streaming query. 154 """ 155 self._jsq.stop() 156 157 @since(2.1) 158 def explain(self, extended=False): 159 """Prints the (logical and physical) plans to the console for debugging purpose. 160 161 :param extended: boolean, default ``False``. If ``False``, prints only the physical plan. 162 163 >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start() 164 >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans. 165 >>> sq.explain() 166 == Physical Plan == 167 ... 168 >>> sq.explain(True) 169 == Parsed Logical Plan == 170 ... 171 == Analyzed Logical Plan == 172 ... 173 == Optimized Logical Plan == 174 ... 175 == Physical Plan == 176 ... 177 >>> sq.stop() 178 """ 179 # Cannot call `_jsq.explain(...)` because it will print in the JVM process. 180 # We should print it in the Python process. 181 print(self._jsq.explainInternal(extended)) 182 183 @since(2.1) 184 def exception(self): 185 """ 186 :return: the StreamingQueryException if the query was terminated by an exception, or None. 187 """ 188 if self._jsq.exception().isDefined(): 189 je = self._jsq.exception().get() 190 msg = je.toString().split(': ', 1)[1] # Drop the Java StreamingQueryException type info 191 stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace())) 192 return StreamingQueryException(msg, stackTrace) 193 else: 194 return None 195 196 197class StreamingQueryManager(object): 198 """A class to manage all the :class:`StreamingQuery` StreamingQueries active. 199 200 .. note:: Experimental 201 202 .. versionadded:: 2.0 203 """ 204 205 def __init__(self, jsqm): 206 self._jsqm = jsqm 207 208 @property 209 @ignore_unicode_prefix 210 @since(2.0) 211 def active(self): 212 """Returns a list of active queries associated with this SQLContext 213 214 >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() 215 >>> sqm = spark.streams 216 >>> # get the list of active streaming queries 217 >>> [q.name for q in sqm.active] 218 [u'this_query'] 219 >>> sq.stop() 220 """ 221 return [StreamingQuery(jsq) for jsq in self._jsqm.active()] 222 223 @ignore_unicode_prefix 224 @since(2.0) 225 def get(self, id): 226 """Returns an active query from this SQLContext or throws exception if an active query 227 with this name doesn't exist. 228 229 >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() 230 >>> sq.name 231 u'this_query' 232 >>> sq = spark.streams.get(sq.id) 233 >>> sq.isActive 234 True 235 >>> sq = sqlContext.streams.get(sq.id) 236 >>> sq.isActive 237 True 238 >>> sq.stop() 239 """ 240 return StreamingQuery(self._jsqm.get(id)) 241 242 @since(2.0) 243 def awaitAnyTermination(self, timeout=None): 244 """Wait until any of the queries on the associated SQLContext has terminated since the 245 creation of the context, or since :func:`resetTerminated()` was called. If any query was 246 terminated with an exception, then the exception will be thrown. 247 If `timeout` is set, it returns whether the query has terminated or not within the 248 `timeout` seconds. 249 250 If a query has terminated, then subsequent calls to :func:`awaitAnyTermination()` will 251 either return immediately (if the query was terminated by :func:`query.stop()`), 252 or throw the exception immediately (if the query was terminated with exception). Use 253 :func:`resetTerminated()` to clear past terminations and wait for new terminations. 254 255 In the case where multiple queries have terminated since :func:`resetTermination()` 256 was called, if any query has terminated with exception, then :func:`awaitAnyTermination()` 257 will throw any of the exception. For correctly documenting exceptions across multiple 258 queries, users need to stop all of them after any of them terminates with exception, and 259 then check the `query.exception()` for each query. 260 261 throws :class:`StreamingQueryException`, if `this` query has terminated with an exception 262 """ 263 if timeout is not None: 264 if not isinstance(timeout, (int, float)) or timeout < 0: 265 raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) 266 return self._jsqm.awaitAnyTermination(int(timeout * 1000)) 267 else: 268 return self._jsqm.awaitAnyTermination() 269 270 @since(2.0) 271 def resetTerminated(self): 272 """Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used 273 again to wait for new terminations. 274 275 >>> spark.streams.resetTerminated() 276 """ 277 self._jsqm.resetTerminated() 278 279 280class Trigger(object): 281 """Used to indicate how often results should be produced by a :class:`StreamingQuery`. 282 283 .. note:: Experimental 284 285 .. versionadded:: 2.0 286 """ 287 288 __metaclass__ = ABCMeta 289 290 @abstractmethod 291 def _to_java_trigger(self, sqlContext): 292 """Internal method to construct the trigger on the jvm. 293 """ 294 pass 295 296 297class ProcessingTime(Trigger): 298 """A trigger that runs a query periodically based on the processing time. If `interval` is 0, 299 the query will run as fast as possible. 300 301 The interval should be given as a string, e.g. '2 seconds', '5 minutes', ... 302 303 .. note:: Experimental 304 305 .. versionadded:: 2.0 306 """ 307 308 def __init__(self, interval): 309 if type(interval) != str or len(interval.strip()) == 0: 310 raise ValueError("interval should be a non empty interval string, e.g. '2 seconds'.") 311 self.interval = interval 312 313 def _to_java_trigger(self, sqlContext): 314 return sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create( 315 self.interval) 316 317 318class DataStreamReader(OptionUtils): 319 """ 320 Interface used to load a streaming :class:`DataFrame` from external storage systems 321 (e.g. file systems, key-value stores, etc). Use :func:`spark.readStream` 322 to access this. 323 324 .. note:: Experimental. 325 326 .. versionadded:: 2.0 327 """ 328 329 def __init__(self, spark): 330 self._jreader = spark._ssql_ctx.readStream() 331 self._spark = spark 332 333 def _df(self, jdf): 334 from pyspark.sql.dataframe import DataFrame 335 return DataFrame(jdf, self._spark) 336 337 @since(2.0) 338 def format(self, source): 339 """Specifies the input data source format. 340 341 .. note:: Experimental. 342 343 :param source: string, name of the data source, e.g. 'json', 'parquet'. 344 345 >>> s = spark.readStream.format("text") 346 """ 347 self._jreader = self._jreader.format(source) 348 return self 349 350 @since(2.0) 351 def schema(self, schema): 352 """Specifies the input schema. 353 354 Some data sources (e.g. JSON) can infer the input schema automatically from data. 355 By specifying the schema here, the underlying data source can skip the schema 356 inference step, and thus speed up data loading. 357 358 .. note:: Experimental. 359 360 :param schema: a :class:`pyspark.sql.types.StructType` object 361 362 >>> s = spark.readStream.schema(sdf_schema) 363 """ 364 from pyspark.sql import SparkSession 365 if not isinstance(schema, StructType): 366 raise TypeError("schema should be StructType") 367 spark = SparkSession.builder.getOrCreate() 368 jschema = spark._jsparkSession.parseDataType(schema.json()) 369 self._jreader = self._jreader.schema(jschema) 370 return self 371 372 @since(2.0) 373 def option(self, key, value): 374 """Adds an input option for the underlying data source. 375 376 .. note:: Experimental. 377 378 >>> s = spark.readStream.option("x", 1) 379 """ 380 self._jreader = self._jreader.option(key, to_str(value)) 381 return self 382 383 @since(2.0) 384 def options(self, **options): 385 """Adds input options for the underlying data source. 386 387 .. note:: Experimental. 388 389 >>> s = spark.readStream.options(x="1", y=2) 390 """ 391 for k in options: 392 self._jreader = self._jreader.option(k, to_str(options[k])) 393 return self 394 395 @since(2.0) 396 def load(self, path=None, format=None, schema=None, **options): 397 """Loads a data stream from a data source and returns it as a :class`DataFrame`. 398 399 .. note:: Experimental. 400 401 :param path: optional string for file-system backed data sources. 402 :param format: optional string for format of the data source. Default to 'parquet'. 403 :param schema: optional :class:`pyspark.sql.types.StructType` for the input schema. 404 :param options: all other string options 405 406 >>> json_sdf = spark.readStream.format("json") \\ 407 ... .schema(sdf_schema) \\ 408 ... .load(tempfile.mkdtemp()) 409 >>> json_sdf.isStreaming 410 True 411 >>> json_sdf.schema == sdf_schema 412 True 413 """ 414 if format is not None: 415 self.format(format) 416 if schema is not None: 417 self.schema(schema) 418 self.options(**options) 419 if path is not None: 420 if type(path) != str or len(path.strip()) == 0: 421 raise ValueError("If the path is provided for stream, it needs to be a " + 422 "non-empty string. List of paths are not supported.") 423 return self._df(self._jreader.load(path)) 424 else: 425 return self._df(self._jreader.load()) 426 427 @since(2.0) 428 def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, 429 allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, 430 allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, 431 mode=None, columnNameOfCorruptRecord=None, dateFormat=None, 432 timestampFormat=None): 433 """ 434 Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON 435 <http://jsonlines.org/>`_) and returns a :class`DataFrame`. 436 437 If the ``schema`` parameter is not specified, this function goes 438 through the input once to determine the input schema. 439 440 .. note:: Experimental. 441 442 :param path: string represents path to the JSON dataset, 443 or RDD of Strings storing JSON objects. 444 :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema. 445 :param primitivesAsString: infers all primitive values as a string type. If None is set, 446 it uses the default value, ``false``. 447 :param prefersDecimal: infers all floating-point values as a decimal type. If the values 448 do not fit in decimal, then it infers them as doubles. If None is 449 set, it uses the default value, ``false``. 450 :param allowComments: ignores Java/C++ style comment in JSON records. If None is set, 451 it uses the default value, ``false``. 452 :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set, 453 it uses the default value, ``false``. 454 :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is 455 set, it uses the default value, ``true``. 456 :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is 457 set, it uses the default value, ``false``. 458 :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character 459 using backslash quoting mechanism. If None is 460 set, it uses the default value, ``false``. 461 :param mode: allows a mode for dealing with corrupt records during parsing. If None is 462 set, it uses the default value, ``PERMISSIVE``. 463 464 * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ 465 record and puts the malformed string into a new field configured by \ 466 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \ 467 ``null`` for extra fields. 468 * ``DROPMALFORMED`` : ignores the whole corrupted records. 469 * ``FAILFAST`` : throws an exception when it meets corrupted records. 470 471 :param columnNameOfCorruptRecord: allows renaming the new field having malformed string 472 created by ``PERMISSIVE`` mode. This overrides 473 ``spark.sql.columnNameOfCorruptRecord``. If None is set, 474 it uses the value specified in 475 ``spark.sql.columnNameOfCorruptRecord``. 476 :param dateFormat: sets the string that indicates a date format. Custom date formats 477 follow the formats at ``java.text.SimpleDateFormat``. This 478 applies to date type. If None is set, it uses the 479 default value value, ``yyyy-MM-dd``. 480 :param timestampFormat: sets the string that indicates a timestamp format. Custom date 481 formats follow the formats at ``java.text.SimpleDateFormat``. 482 This applies to timestamp type. If None is set, it uses the 483 default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. 484 485 >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema) 486 >>> json_sdf.isStreaming 487 True 488 >>> json_sdf.schema == sdf_schema 489 True 490 """ 491 self._set_opts( 492 schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal, 493 allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames, 494 allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, 495 allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, 496 mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, 497 timestampFormat=timestampFormat) 498 if isinstance(path, basestring): 499 return self._df(self._jreader.json(path)) 500 else: 501 raise TypeError("path can be only a single string") 502 503 @since(2.0) 504 def parquet(self, path): 505 """Loads a Parquet file stream, returning the result as a :class:`DataFrame`. 506 507 You can set the following Parquet-specific option(s) for reading Parquet files: 508 * ``mergeSchema``: sets whether we should merge schemas collected from all \ 509 Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \ 510 The default value is specified in ``spark.sql.parquet.mergeSchema``. 511 512 .. note:: Experimental. 513 514 >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp()) 515 >>> parquet_sdf.isStreaming 516 True 517 >>> parquet_sdf.schema == sdf_schema 518 True 519 """ 520 if isinstance(path, basestring): 521 return self._df(self._jreader.parquet(path)) 522 else: 523 raise TypeError("path can be only a single string") 524 525 @ignore_unicode_prefix 526 @since(2.0) 527 def text(self, path): 528 """ 529 Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a 530 string column named "value", and followed by partitioned columns if there 531 are any. 532 533 Each line in the text file is a new row in the resulting DataFrame. 534 535 .. note:: Experimental. 536 537 :param paths: string, or list of strings, for input path(s). 538 539 >>> text_sdf = spark.readStream.text(tempfile.mkdtemp()) 540 >>> text_sdf.isStreaming 541 True 542 >>> "value" in str(text_sdf.schema) 543 True 544 """ 545 if isinstance(path, basestring): 546 return self._df(self._jreader.text(path)) 547 else: 548 raise TypeError("path can be only a single string") 549 550 @since(2.0) 551 def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, 552 comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, 553 ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, 554 negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, 555 maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None): 556 """Loads a CSV file stream and returns the result as a :class:`DataFrame`. 557 558 This function will go through the input once to determine the input schema if 559 ``inferSchema`` is enabled. To avoid going through the entire data once, disable 560 ``inferSchema`` option or specify the schema explicitly using ``schema``. 561 562 .. note:: Experimental. 563 564 :param path: string, or list of strings, for input path(s). 565 :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema. 566 :param sep: sets the single character as a separator for each field and value. 567 If None is set, it uses the default value, ``,``. 568 :param encoding: decodes the CSV files by the given encoding type. If None is set, 569 it uses the default value, ``UTF-8``. 570 :param quote: sets the single character used for escaping quoted values where the 571 separator can be part of the value. If None is set, it uses the default 572 value, ``"``. If you would like to turn off quotations, you need to set an 573 empty string. 574 :param escape: sets the single character used for escaping quotes inside an already 575 quoted value. If None is set, it uses the default value, ``\``. 576 :param comment: sets the single character used for skipping lines beginning with this 577 character. By default (None), it is disabled. 578 :param header: uses the first line as names of columns. If None is set, it uses the 579 default value, ``false``. 580 :param inferSchema: infers the input schema automatically from data. It requires one extra 581 pass over the data. If None is set, it uses the default value, ``false``. 582 :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values 583 being read should be skipped. If None is set, it uses 584 the default value, ``false``. 585 :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values 586 being read should be skipped. If None is set, it uses 587 the default value, ``false``. 588 :param nullValue: sets the string representation of a null value. If None is set, it uses 589 the default value, empty string. Since 2.0.1, this ``nullValue`` param 590 applies to all supported types including the string type. 591 :param nanValue: sets the string representation of a non-number value. If None is set, it 592 uses the default value, ``NaN``. 593 :param positiveInf: sets the string representation of a positive infinity value. If None 594 is set, it uses the default value, ``Inf``. 595 :param negativeInf: sets the string representation of a negative infinity value. If None 596 is set, it uses the default value, ``Inf``. 597 :param dateFormat: sets the string that indicates a date format. Custom date formats 598 follow the formats at ``java.text.SimpleDateFormat``. This 599 applies to date type. If None is set, it uses the 600 default value value, ``yyyy-MM-dd``. 601 :param timestampFormat: sets the string that indicates a timestamp format. Custom date 602 formats follow the formats at ``java.text.SimpleDateFormat``. 603 This applies to timestamp type. If None is set, it uses the 604 default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. 605 :param maxColumns: defines a hard limit of how many columns a record can have. If None is 606 set, it uses the default value, ``20480``. 607 :param maxCharsPerColumn: defines the maximum number of characters allowed for any given 608 value being read. If None is set, it uses the default value, 609 ``-1`` meaning unlimited length. 610 :param mode: allows a mode for dealing with corrupt records during parsing. If None is 611 set, it uses the default value, ``PERMISSIVE``. 612 613 * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. 614 When a schema is set by user, it sets ``null`` for extra fields. 615 * ``DROPMALFORMED`` : ignores the whole corrupted records. 616 * ``FAILFAST`` : throws an exception when it meets corrupted records. 617 618 >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema) 619 >>> csv_sdf.isStreaming 620 True 621 >>> csv_sdf.schema == sdf_schema 622 True 623 """ 624 self._set_opts( 625 schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment, 626 header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, 627 ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, 628 nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, 629 dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, 630 maxCharsPerColumn=maxCharsPerColumn, 631 maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) 632 if isinstance(path, basestring): 633 return self._df(self._jreader.csv(path)) 634 else: 635 raise TypeError("path can be only a single string") 636 637 638class DataStreamWriter(object): 639 """ 640 Interface used to write a streaming :class:`DataFrame` to external storage systems 641 (e.g. file systems, key-value stores, etc). Use :func:`DataFrame.writeStream` 642 to access this. 643 644 .. note:: Experimental. 645 646 .. versionadded:: 2.0 647 """ 648 649 def __init__(self, df): 650 self._df = df 651 self._spark = df.sql_ctx 652 self._jwrite = df._jdf.writeStream() 653 654 def _sq(self, jsq): 655 from pyspark.sql.streaming import StreamingQuery 656 return StreamingQuery(jsq) 657 658 @since(2.0) 659 def outputMode(self, outputMode): 660 """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. 661 662 Options include: 663 664 * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to 665 the sink 666 * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink 667 every time these is some updates 668 * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be 669 written to the sink every time there are some updates. If the query doesn't contain 670 aggregations, it will be equivalent to `append` mode. 671 672 .. note:: Experimental. 673 674 >>> writer = sdf.writeStream.outputMode('append') 675 """ 676 if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0: 677 raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode) 678 self._jwrite = self._jwrite.outputMode(outputMode) 679 return self 680 681 @since(2.0) 682 def format(self, source): 683 """Specifies the underlying output data source. 684 685 .. note:: Experimental. 686 687 :param source: string, name of the data source, which for now can be 'parquet'. 688 689 >>> writer = sdf.writeStream.format('json') 690 """ 691 self._jwrite = self._jwrite.format(source) 692 return self 693 694 @since(2.0) 695 def option(self, key, value): 696 """Adds an output option for the underlying data source. 697 698 .. note:: Experimental. 699 """ 700 self._jwrite = self._jwrite.option(key, to_str(value)) 701 return self 702 703 @since(2.0) 704 def options(self, **options): 705 """Adds output options for the underlying data source. 706 707 .. note:: Experimental. 708 """ 709 for k in options: 710 self._jwrite = self._jwrite.option(k, to_str(options[k])) 711 return self 712 713 @since(2.0) 714 def partitionBy(self, *cols): 715 """Partitions the output by the given columns on the file system. 716 717 If specified, the output is laid out on the file system similar 718 to Hive's partitioning scheme. 719 720 .. note:: Experimental. 721 722 :param cols: name of columns 723 724 """ 725 if len(cols) == 1 and isinstance(cols[0], (list, tuple)): 726 cols = cols[0] 727 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols)) 728 return self 729 730 @since(2.0) 731 def queryName(self, queryName): 732 """Specifies the name of the :class:`StreamingQuery` that can be started with 733 :func:`start`. This name must be unique among all the currently active queries 734 in the associated SparkSession. 735 736 .. note:: Experimental. 737 738 :param queryName: unique name for the query 739 740 >>> writer = sdf.writeStream.queryName('streaming_query') 741 """ 742 if not queryName or type(queryName) != str or len(queryName.strip()) == 0: 743 raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName) 744 self._jwrite = self._jwrite.queryName(queryName) 745 return self 746 747 @keyword_only 748 @since(2.0) 749 def trigger(self, processingTime=None): 750 """Set the trigger for the stream query. If this is not set it will run the query as fast 751 as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``. 752 753 .. note:: Experimental. 754 755 :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'. 756 757 >>> # trigger the query for execution every 5 seconds 758 >>> writer = sdf.writeStream.trigger(processingTime='5 seconds') 759 """ 760 from pyspark.sql.streaming import ProcessingTime 761 trigger = None 762 if processingTime is not None: 763 if type(processingTime) != str or len(processingTime.strip()) == 0: 764 raise ValueError('The processing time must be a non empty string. Got: %s' % 765 processingTime) 766 trigger = ProcessingTime(processingTime) 767 if trigger is None: 768 raise ValueError('A trigger was not provided. Supported triggers: processingTime.') 769 self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark)) 770 return self 771 772 @ignore_unicode_prefix 773 @since(2.0) 774 def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None, 775 **options): 776 """Streams the contents of the :class:`DataFrame` to a data source. 777 778 The data source is specified by the ``format`` and a set of ``options``. 779 If ``format`` is not specified, the default data source configured by 780 ``spark.sql.sources.default`` will be used. 781 782 .. note:: Experimental. 783 784 :param path: the path in a Hadoop supported file system 785 :param format: the format used to save 786 :param outputMode: specifies how data of a streaming DataFrame/Dataset is written to a 787 streaming sink. 788 789 * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to the 790 sink 791 * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink 792 every time these is some updates 793 * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be 794 written to the sink every time there are some updates. If the query doesn't contain 795 aggregations, it will be equivalent to `append` mode. 796 :param partitionBy: names of partitioning columns 797 :param queryName: unique name for the query 798 :param options: All other string options. You may want to provide a `checkpointLocation` 799 for most streams, however it is not required for a `memory` stream. 800 801 >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() 802 >>> sq.isActive 803 True 804 >>> sq.name 805 u'this_query' 806 >>> sq.stop() 807 >>> sq.isActive 808 False 809 >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start( 810 ... queryName='that_query', outputMode="append", format='memory') 811 >>> sq.name 812 u'that_query' 813 >>> sq.isActive 814 True 815 >>> sq.stop() 816 """ 817 self.options(**options) 818 if outputMode is not None: 819 self.outputMode(outputMode) 820 if partitionBy is not None: 821 self.partitionBy(partitionBy) 822 if format is not None: 823 self.format(format) 824 if queryName is not None: 825 self.queryName(queryName) 826 if path is None: 827 return self._sq(self._jwrite.start()) 828 else: 829 return self._sq(self._jwrite.start(path)) 830 831 832def _test(): 833 import doctest 834 import os 835 import tempfile 836 from pyspark.sql import Row, SparkSession, SQLContext 837 import pyspark.sql.streaming 838 839 os.chdir(os.environ["SPARK_HOME"]) 840 841 globs = pyspark.sql.streaming.__dict__.copy() 842 try: 843 spark = SparkSession.builder.getOrCreate() 844 except py4j.protocol.Py4JError: 845 spark = SparkSession(sc) 846 847 globs['tempfile'] = tempfile 848 globs['os'] = os 849 globs['spark'] = spark 850 globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext) 851 globs['sdf'] = \ 852 spark.readStream.format('text').load('python/test_support/sql/streaming') 853 globs['sdf_schema'] = StructType([StructField("data", StringType(), False)]) 854 globs['df'] = \ 855 globs['spark'].readStream.format('text').load('python/test_support/sql/streaming') 856 857 (failure_count, test_count) = doctest.testmod( 858 pyspark.sql.streaming, globs=globs, 859 optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) 860 globs['spark'].stop() 861 862 if failure_count: 863 exit(-1) 864 865 866if __name__ == "__main__": 867 _test() 868