1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.streaming.state 19 20import java.io.{File, IOException} 21import java.net.URI 22 23import scala.collection.JavaConverters._ 24import scala.collection.mutable 25import scala.util.Random 26 27import org.apache.commons.io.FileUtils 28import org.apache.hadoop.conf.Configuration 29import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} 30import org.scalatest.{BeforeAndAfter, PrivateMethodTester} 31import org.scalatest.concurrent.Eventually._ 32import org.scalatest.time.SpanSugar._ 33 34import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite} 35import org.apache.spark.LocalSparkContext._ 36import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} 37import org.apache.spark.sql.catalyst.util.quietly 38import org.apache.spark.sql.internal.SQLConf 39import org.apache.spark.sql.types._ 40import org.apache.spark.unsafe.types.UTF8String 41import org.apache.spark.util.Utils 42 43class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMethodTester { 44 type MapType = mutable.HashMap[UnsafeRow, UnsafeRow] 45 46 import StateStoreCoordinatorSuite._ 47 import StateStoreSuite._ 48 49 private val tempDir = Utils.createTempDir().toString 50 private val keySchema = StructType(Seq(StructField("key", StringType, true))) 51 private val valueSchema = StructType(Seq(StructField("value", IntegerType, true))) 52 53 before { 54 StateStore.stop() 55 require(!StateStore.isMaintenanceRunning) 56 } 57 58 after { 59 StateStore.stop() 60 require(!StateStore.isMaintenanceRunning) 61 } 62 63 test("get, put, remove, commit, and all data iterator") { 64 val provider = newStoreProvider() 65 66 // Verify state before starting a new set of updates 67 assert(provider.latestIterator().isEmpty) 68 69 val store = provider.getStore(0) 70 assert(!store.hasCommitted) 71 intercept[IllegalStateException] { 72 store.iterator() 73 } 74 intercept[IllegalStateException] { 75 store.updates() 76 } 77 78 // Verify state after updating 79 put(store, "a", 1) 80 assert(store.numKeys() === 1) 81 intercept[IllegalStateException] { 82 store.iterator() 83 } 84 intercept[IllegalStateException] { 85 store.updates() 86 } 87 assert(provider.latestIterator().isEmpty) 88 89 // Make updates, commit and then verify state 90 put(store, "b", 2) 91 put(store, "aa", 3) 92 assert(store.numKeys() === 3) 93 remove(store, _.startsWith("a")) 94 assert(store.numKeys() === 1) 95 assert(store.commit() === 1) 96 97 assert(store.hasCommitted) 98 assert(rowsToSet(store.iterator()) === Set("b" -> 2)) 99 assert(rowsToSet(provider.latestIterator()) === Set("b" -> 2)) 100 assert(fileExists(provider, version = 1, isSnapshot = false)) 101 102 assert(getDataFromFiles(provider) === Set("b" -> 2)) 103 104 // Trying to get newer versions should fail 105 intercept[Exception] { 106 provider.getStore(2) 107 } 108 intercept[Exception] { 109 getDataFromFiles(provider, 2) 110 } 111 112 // New updates to the reloaded store with new version, and does not change old version 113 val reloadedProvider = new HDFSBackedStateStoreProvider( 114 store.id, keySchema, valueSchema, StateStoreConf.empty, new Configuration) 115 val reloadedStore = reloadedProvider.getStore(1) 116 assert(reloadedStore.numKeys() === 1) 117 put(reloadedStore, "c", 4) 118 assert(reloadedStore.numKeys() === 2) 119 assert(reloadedStore.commit() === 2) 120 assert(rowsToSet(reloadedStore.iterator()) === Set("b" -> 2, "c" -> 4)) 121 assert(getDataFromFiles(provider) === Set("b" -> 2, "c" -> 4)) 122 assert(getDataFromFiles(provider, version = 1) === Set("b" -> 2)) 123 assert(getDataFromFiles(provider, version = 2) === Set("b" -> 2, "c" -> 4)) 124 } 125 126 test("updates iterator with all combos of updates and removes") { 127 val provider = newStoreProvider() 128 var currentVersion: Int = 0 129 130 def withStore(body: StateStore => Unit): Unit = { 131 val store = provider.getStore(currentVersion) 132 body(store) 133 currentVersion += 1 134 } 135 136 // New data should be seen in updates as value added, even if they had multiple updates 137 withStore { store => 138 put(store, "a", 1) 139 put(store, "aa", 1) 140 put(store, "aa", 2) 141 store.commit() 142 assert(updatesToSet(store.updates()) === Set(Added("a", 1), Added("aa", 2))) 143 assert(rowsToSet(store.iterator()) === Set("a" -> 1, "aa" -> 2)) 144 } 145 146 // Multiple updates to same key should be collapsed in the updates as a single value update 147 // Keys that have not been updated should not appear in the updates 148 withStore { store => 149 put(store, "a", 4) 150 put(store, "a", 6) 151 store.commit() 152 assert(updatesToSet(store.updates()) === Set(Updated("a", 6))) 153 assert(rowsToSet(store.iterator()) === Set("a" -> 6, "aa" -> 2)) 154 } 155 156 // Keys added, updated and finally removed before commit should not appear in updates 157 withStore { store => 158 put(store, "b", 4) // Added, finally removed 159 put(store, "bb", 5) // Added, updated, finally removed 160 put(store, "bb", 6) 161 remove(store, _.startsWith("b")) 162 store.commit() 163 assert(updatesToSet(store.updates()) === Set.empty) 164 assert(rowsToSet(store.iterator()) === Set("a" -> 6, "aa" -> 2)) 165 } 166 167 // Removed data should be seen in updates as a key removed 168 // Removed, but re-added data should be seen in updates as a value update 169 withStore { store => 170 remove(store, _.startsWith("a")) 171 put(store, "a", 10) 172 store.commit() 173 assert(updatesToSet(store.updates()) === Set(Updated("a", 10), Removed("aa"))) 174 assert(rowsToSet(store.iterator()) === Set("a" -> 10)) 175 } 176 } 177 178 test("cancel") { 179 val provider = newStoreProvider() 180 val store = provider.getStore(0) 181 put(store, "a", 1) 182 store.commit() 183 assert(rowsToSet(store.iterator()) === Set("a" -> 1)) 184 185 // cancelUpdates should not change the data in the files 186 val store1 = provider.getStore(1) 187 put(store1, "b", 1) 188 store1.abort() 189 assert(getDataFromFiles(provider) === Set("a" -> 1)) 190 } 191 192 test("getStore with unexpected versions") { 193 val provider = newStoreProvider() 194 195 intercept[IllegalArgumentException] { 196 provider.getStore(-1) 197 } 198 199 // Prepare some data in the store 200 val store = provider.getStore(0) 201 put(store, "a", 1) 202 assert(store.commit() === 1) 203 assert(rowsToSet(store.iterator()) === Set("a" -> 1)) 204 205 intercept[IllegalStateException] { 206 provider.getStore(2) 207 } 208 209 // Update store version with some data 210 val store1 = provider.getStore(1) 211 put(store1, "b", 1) 212 assert(store1.commit() === 2) 213 assert(rowsToSet(store1.iterator()) === Set("a" -> 1, "b" -> 1)) 214 assert(getDataFromFiles(provider) === Set("a" -> 1, "b" -> 1)) 215 } 216 217 test("snapshotting") { 218 val provider = newStoreProvider(minDeltasForSnapshot = 5) 219 220 var currentVersion = 0 221 def updateVersionTo(targetVersion: Int): Unit = { 222 for (i <- currentVersion + 1 to targetVersion) { 223 val store = provider.getStore(currentVersion) 224 put(store, "a", i) 225 store.commit() 226 currentVersion += 1 227 } 228 require(currentVersion === targetVersion) 229 } 230 231 updateVersionTo(2) 232 require(getDataFromFiles(provider) === Set("a" -> 2)) 233 provider.doMaintenance() // should not generate snapshot files 234 assert(getDataFromFiles(provider) === Set("a" -> 2)) 235 236 for (i <- 1 to currentVersion) { 237 assert(fileExists(provider, i, isSnapshot = false)) // all delta files present 238 assert(!fileExists(provider, i, isSnapshot = true)) // no snapshot files present 239 } 240 241 // After version 6, snapshotting should generate one snapshot file 242 updateVersionTo(6) 243 require(getDataFromFiles(provider) === Set("a" -> 6), "store not updated correctly") 244 provider.doMaintenance() // should generate snapshot files 245 246 val snapshotVersion = (0 to 6).find(version => fileExists(provider, version, isSnapshot = true)) 247 assert(snapshotVersion.nonEmpty, "snapshot file not generated") 248 deleteFilesEarlierThanVersion(provider, snapshotVersion.get) 249 assert( 250 getDataFromFiles(provider, snapshotVersion.get) === Set("a" -> snapshotVersion.get), 251 "snapshotting messed up the data of the snapshotted version") 252 assert( 253 getDataFromFiles(provider) === Set("a" -> 6), 254 "snapshotting messed up the data of the final version") 255 256 // After version 20, snapshotting should generate newer snapshot files 257 updateVersionTo(20) 258 require(getDataFromFiles(provider) === Set("a" -> 20), "store not updated correctly") 259 provider.doMaintenance() // do snapshot 260 261 val latestSnapshotVersion = (0 to 20).filter(version => 262 fileExists(provider, version, isSnapshot = true)).lastOption 263 assert(latestSnapshotVersion.nonEmpty, "no snapshot file found") 264 assert(latestSnapshotVersion.get > snapshotVersion.get, "newer snapshot not generated") 265 266 deleteFilesEarlierThanVersion(provider, latestSnapshotVersion.get) 267 assert(getDataFromFiles(provider) === Set("a" -> 20), "snapshotting messed up the data") 268 } 269 270 test("cleaning") { 271 val provider = newStoreProvider(minDeltasForSnapshot = 5) 272 273 for (i <- 1 to 20) { 274 val store = provider.getStore(i - 1) 275 put(store, "a", i) 276 store.commit() 277 provider.doMaintenance() // do cleanup 278 } 279 require( 280 rowsToSet(provider.latestIterator()) === Set("a" -> 20), 281 "store not updated correctly") 282 283 assert(!fileExists(provider, version = 1, isSnapshot = false)) // first file should be deleted 284 285 // last couple of versions should be retrievable 286 assert(getDataFromFiles(provider, 20) === Set("a" -> 20)) 287 assert(getDataFromFiles(provider, 19) === Set("a" -> 19)) 288 } 289 290 test("SPARK-19677: Committing a delta file atop an existing one should not fail on HDFS") { 291 val conf = new Configuration() 292 conf.set("fs.fake.impl", classOf[RenameLikeHDFSFileSystem].getName) 293 conf.set("fs.default.name", "fake:///") 294 295 val provider = newStoreProvider(hadoopConf = conf) 296 provider.getStore(0).commit() 297 provider.getStore(0).commit() 298 299 // Verify we don't leak temp files 300 val tempFiles = FileUtils.listFiles(new File(provider.id.checkpointLocation), 301 null, true).asScala.filter(_.getName.startsWith("temp-")) 302 assert(tempFiles.isEmpty) 303 } 304 305 test("corrupted file handling") { 306 val provider = newStoreProvider(minDeltasForSnapshot = 5) 307 for (i <- 1 to 6) { 308 val store = provider.getStore(i - 1) 309 put(store, "a", i) 310 store.commit() 311 provider.doMaintenance() // do cleanup 312 } 313 val snapshotVersion = (0 to 10).find( version => 314 fileExists(provider, version, isSnapshot = true)).getOrElse(fail("snapshot file not found")) 315 316 // Corrupt snapshot file and verify that it throws error 317 assert(getDataFromFiles(provider, snapshotVersion) === Set("a" -> snapshotVersion)) 318 corruptFile(provider, snapshotVersion, isSnapshot = true) 319 intercept[Exception] { 320 getDataFromFiles(provider, snapshotVersion) 321 } 322 323 // Corrupt delta file and verify that it throws error 324 assert(getDataFromFiles(provider, snapshotVersion - 1) === Set("a" -> (snapshotVersion - 1))) 325 corruptFile(provider, snapshotVersion - 1, isSnapshot = false) 326 intercept[Exception] { 327 getDataFromFiles(provider, snapshotVersion - 1) 328 } 329 330 // Delete delta file and verify that it throws error 331 deleteFilesEarlierThanVersion(provider, snapshotVersion) 332 intercept[Exception] { 333 getDataFromFiles(provider, snapshotVersion - 1) 334 } 335 } 336 337 test("StateStore.get") { 338 quietly { 339 val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString 340 val storeId = StateStoreId(dir, 0, 0) 341 val storeConf = StateStoreConf.empty 342 val hadoopConf = new Configuration() 343 344 345 // Verify that trying to get incorrect versions throw errors 346 intercept[IllegalArgumentException] { 347 StateStore.get(storeId, keySchema, valueSchema, -1, storeConf, hadoopConf) 348 } 349 assert(!StateStore.isLoaded(storeId)) // version -1 should not attempt to load the store 350 351 intercept[IllegalStateException] { 352 StateStore.get(storeId, keySchema, valueSchema, 1, storeConf, hadoopConf) 353 } 354 355 // Increase version of the store 356 val store0 = StateStore.get(storeId, keySchema, valueSchema, 0, storeConf, hadoopConf) 357 assert(store0.version === 0) 358 put(store0, "a", 1) 359 store0.commit() 360 361 assert(StateStore.get(storeId, keySchema, valueSchema, 1, storeConf, hadoopConf).version == 1) 362 assert(StateStore.get(storeId, keySchema, valueSchema, 0, storeConf, hadoopConf).version == 0) 363 364 // Verify that you can remove the store and still reload and use it 365 StateStore.unload(storeId) 366 assert(!StateStore.isLoaded(storeId)) 367 368 val store1 = StateStore.get(storeId, keySchema, valueSchema, 1, storeConf, hadoopConf) 369 assert(StateStore.isLoaded(storeId)) 370 put(store1, "a", 2) 371 assert(store1.commit() === 2) 372 assert(rowsToSet(store1.iterator()) === Set("a" -> 2)) 373 } 374 } 375 376 test("maintenance") { 377 val conf = new SparkConf() 378 .setMaster("local") 379 .setAppName("test") 380 // Make maintenance thread do snapshots and cleanups very fast 381 .set(StateStore.MAINTENANCE_INTERVAL_CONFIG, "10ms") 382 // Make sure that when SparkContext stops, the StateStore maintenance thread 'quickly' 383 // fails to talk to the StateStoreCoordinator and unloads all the StateStores 384 .set("spark.rpc.numRetries", "1") 385 val opId = 0 386 val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString 387 val storeId = StateStoreId(dir, opId, 0) 388 val sqlConf = new SQLConf() 389 sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2) 390 val storeConf = StateStoreConf(sqlConf) 391 val hadoopConf = new Configuration() 392 val provider = new HDFSBackedStateStoreProvider( 393 storeId, keySchema, valueSchema, storeConf, hadoopConf) 394 395 var latestStoreVersion = 0 396 397 def generateStoreVersions() { 398 for (i <- 1 to 20) { 399 val store = StateStore.get( 400 storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf) 401 put(store, "a", i) 402 store.commit() 403 latestStoreVersion += 1 404 } 405 } 406 407 val timeoutDuration = 60 seconds 408 409 quietly { 410 withSpark(new SparkContext(conf)) { sc => 411 withCoordinatorRef(sc) { coordinatorRef => 412 require(!StateStore.isMaintenanceRunning, "StateStore is unexpectedly running") 413 414 // Generate sufficient versions of store for snapshots 415 generateStoreVersions() 416 417 eventually(timeout(timeoutDuration)) { 418 // Store should have been reported to the coordinator 419 assert(coordinatorRef.getLocation(storeId).nonEmpty, "active instance was not reported") 420 421 // Background maintenance should clean up and generate snapshots 422 assert(StateStore.isMaintenanceRunning, "Maintenance task is not running") 423 424 // Some snapshots should have been generated 425 val snapshotVersions = (1 to latestStoreVersion).filter { version => 426 fileExists(provider, version, isSnapshot = true) 427 } 428 assert(snapshotVersions.nonEmpty, "no snapshot file found") 429 } 430 431 // Generate more versions such that there is another snapshot and 432 // the earliest delta file will be cleaned up 433 generateStoreVersions() 434 435 // Earliest delta file should get cleaned up 436 eventually(timeout(timeoutDuration)) { 437 assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted") 438 } 439 440 // If driver decides to deactivate all instances of the store, then this instance 441 // should be unloaded 442 coordinatorRef.deactivateInstances(dir) 443 eventually(timeout(timeoutDuration)) { 444 assert(!StateStore.isLoaded(storeId)) 445 } 446 447 // Reload the store and verify 448 StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf) 449 assert(StateStore.isLoaded(storeId)) 450 451 // If some other executor loads the store, then this instance should be unloaded 452 coordinatorRef.reportActiveInstance(storeId, "other-host", "other-exec") 453 eventually(timeout(timeoutDuration)) { 454 assert(!StateStore.isLoaded(storeId)) 455 } 456 457 // Reload the store and verify 458 StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf) 459 assert(StateStore.isLoaded(storeId)) 460 } 461 } 462 463 // Verify if instance is unloaded if SparkContext is stopped 464 eventually(timeout(timeoutDuration)) { 465 require(SparkEnv.get === null) 466 assert(!StateStore.isLoaded(storeId)) 467 assert(!StateStore.isMaintenanceRunning) 468 } 469 } 470 } 471 472 test("SPARK-18342: commit fails when rename fails") { 473 import RenameReturnsFalseFileSystem._ 474 val dir = scheme + "://" + Utils.createDirectory(tempDir, Random.nextString(5)).toString 475 val conf = new Configuration() 476 conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName) 477 val provider = newStoreProvider(dir = dir, hadoopConf = conf) 478 val store = provider.getStore(0) 479 put(store, "a", 0) 480 val e = intercept[IllegalStateException](store.commit()) 481 assert(e.getCause.getMessage.contains("Failed to rename")) 482 } 483 484 test("SPARK-18416: do not create temp delta file until the store is updated") { 485 val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString 486 val storeId = StateStoreId(dir, 0, 0) 487 val storeConf = StateStoreConf.empty 488 val hadoopConf = new Configuration() 489 val deltaFileDir = new File(s"$dir/0/0/") 490 491 def numTempFiles: Int = { 492 if (deltaFileDir.exists) { 493 deltaFileDir.listFiles.map(_.getName).count(n => n.contains("temp") && !n.startsWith(".")) 494 } else 0 495 } 496 497 def numDeltaFiles: Int = { 498 if (deltaFileDir.exists) { 499 deltaFileDir.listFiles.map(_.getName).count(n => n.contains(".delta") && !n.startsWith(".")) 500 } else 0 501 } 502 503 def shouldNotCreateTempFile[T](body: => T): T = { 504 val before = numTempFiles 505 val result = body 506 assert(numTempFiles === before) 507 result 508 } 509 510 // Getting the store should not create temp file 511 val store0 = shouldNotCreateTempFile { 512 StateStore.get(storeId, keySchema, valueSchema, 0, storeConf, hadoopConf) 513 } 514 515 // Put should create a temp file 516 put(store0, "a", 1) 517 assert(numTempFiles === 1) 518 assert(numDeltaFiles === 0) 519 520 // Commit should remove temp file and create a delta file 521 store0.commit() 522 assert(numTempFiles === 0) 523 assert(numDeltaFiles === 1) 524 525 // Remove should create a temp file 526 val store1 = shouldNotCreateTempFile { 527 StateStore.get(storeId, keySchema, valueSchema, 1, storeConf, hadoopConf) 528 } 529 remove(store1, _ == "a") 530 assert(numTempFiles === 1) 531 assert(numDeltaFiles === 1) 532 533 // Commit should remove temp file and create a delta file 534 store1.commit() 535 assert(numTempFiles === 0) 536 assert(numDeltaFiles === 2) 537 538 // Commit without any updates should create a delta file 539 val store2 = shouldNotCreateTempFile { 540 StateStore.get(storeId, keySchema, valueSchema, 2, storeConf, hadoopConf) 541 } 542 store2.commit() 543 assert(numTempFiles === 0) 544 assert(numDeltaFiles === 3) 545 } 546 547 def getDataFromFiles( 548 provider: HDFSBackedStateStoreProvider, 549 version: Int = -1): Set[(String, Int)] = { 550 val reloadedProvider = new HDFSBackedStateStoreProvider( 551 provider.id, keySchema, valueSchema, StateStoreConf.empty, new Configuration) 552 if (version < 0) { 553 reloadedProvider.latestIterator().map(rowsToStringInt).toSet 554 } else { 555 reloadedProvider.iterator(version).map(rowsToStringInt).toSet 556 } 557 } 558 559 def assertMap( 560 testMapOption: Option[MapType], 561 expectedMap: Map[String, Int]): Unit = { 562 assert(testMapOption.nonEmpty, "no map present") 563 val convertedMap = testMapOption.get.map(rowsToStringInt) 564 assert(convertedMap === expectedMap) 565 } 566 567 def fileExists( 568 provider: HDFSBackedStateStoreProvider, 569 version: Long, 570 isSnapshot: Boolean): Boolean = { 571 val method = PrivateMethod[Path]('baseDir) 572 val basePath = provider invokePrivate method() 573 val fileName = if (isSnapshot) s"$version.snapshot" else s"$version.delta" 574 val filePath = new File(basePath.toString, fileName) 575 filePath.exists 576 } 577 578 def deleteFilesEarlierThanVersion(provider: HDFSBackedStateStoreProvider, version: Long): Unit = { 579 val method = PrivateMethod[Path]('baseDir) 580 val basePath = provider invokePrivate method() 581 for (version <- 0 until version.toInt) { 582 for (isSnapshot <- Seq(false, true)) { 583 val fileName = if (isSnapshot) s"$version.snapshot" else s"$version.delta" 584 val filePath = new File(basePath.toString, fileName) 585 if (filePath.exists) filePath.delete() 586 } 587 } 588 } 589 590 def corruptFile( 591 provider: HDFSBackedStateStoreProvider, 592 version: Long, 593 isSnapshot: Boolean): Unit = { 594 val method = PrivateMethod[Path]('baseDir) 595 val basePath = provider invokePrivate method() 596 val fileName = if (isSnapshot) s"$version.snapshot" else s"$version.delta" 597 val filePath = new File(basePath.toString, fileName) 598 filePath.delete() 599 filePath.createNewFile() 600 } 601 602 def storeLoaded(storeId: StateStoreId): Boolean = { 603 val method = PrivateMethod[mutable.HashMap[StateStoreId, StateStore]]('loadedStores) 604 val loadedStores = StateStore invokePrivate method() 605 loadedStores.contains(storeId) 606 } 607 608 def unloadStore(storeId: StateStoreId): Boolean = { 609 val method = PrivateMethod('remove) 610 StateStore invokePrivate method(storeId) 611 } 612 613 def newStoreProvider( 614 opId: Long = Random.nextLong, 615 partition: Int = 0, 616 minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get, 617 dir: String = Utils.createDirectory(tempDir, Random.nextString(5)).toString, 618 hadoopConf: Configuration = new Configuration() 619 ): HDFSBackedStateStoreProvider = { 620 val sqlConf = new SQLConf() 621 sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot) 622 sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2) 623 new HDFSBackedStateStoreProvider( 624 StateStoreId(dir, opId, partition), 625 keySchema, 626 valueSchema, 627 new StateStoreConf(sqlConf), 628 hadoopConf) 629 } 630 631 def remove(store: StateStore, condition: String => Boolean): Unit = { 632 store.remove(row => condition(rowToString(row))) 633 } 634 635 private def put(store: StateStore, key: String, value: Int): Unit = { 636 store.put(stringToRow(key), intToRow(value)) 637 } 638 639 private def get(store: StateStore, key: String): Option[Int] = { 640 store.get(stringToRow(key)).map(rowToInt) 641 } 642} 643 644private[state] object StateStoreSuite { 645 646 /** Trait and classes mirroring [[StoreUpdate]] for testing store updates iterator */ 647 trait TestUpdate 648 case class Added(key: String, value: Int) extends TestUpdate 649 case class Updated(key: String, value: Int) extends TestUpdate 650 case class Removed(key: String) extends TestUpdate 651 652 val strProj = UnsafeProjection.create(Array[DataType](StringType)) 653 val intProj = UnsafeProjection.create(Array[DataType](IntegerType)) 654 655 def stringToRow(s: String): UnsafeRow = { 656 strProj.apply(new GenericInternalRow(Array[Any](UTF8String.fromString(s)))).copy() 657 } 658 659 def intToRow(i: Int): UnsafeRow = { 660 intProj.apply(new GenericInternalRow(Array[Any](i))).copy() 661 } 662 663 def rowToString(row: UnsafeRow): String = { 664 row.getUTF8String(0).toString 665 } 666 667 def rowToInt(row: UnsafeRow): Int = { 668 row.getInt(0) 669 } 670 671 def rowsToIntInt(row: (UnsafeRow, UnsafeRow)): (Int, Int) = { 672 (rowToInt(row._1), rowToInt(row._2)) 673 } 674 675 676 def rowsToStringInt(row: (UnsafeRow, UnsafeRow)): (String, Int) = { 677 (rowToString(row._1), rowToInt(row._2)) 678 } 679 680 def rowsToSet(iterator: Iterator[(UnsafeRow, UnsafeRow)]): Set[(String, Int)] = { 681 iterator.map(rowsToStringInt).toSet 682 } 683 684 def updatesToSet(iterator: Iterator[StoreUpdate]): Set[TestUpdate] = { 685 iterator.map { 686 case ValueAdded(key, value) => Added(rowToString(key), rowToInt(value)) 687 case ValueUpdated(key, value) => Updated(rowToString(key), rowToInt(value)) 688 case ValueRemoved(key, _) => Removed(rowToString(key)) 689 }.toSet 690 } 691} 692 693/** 694 * Fake FileSystem that simulates HDFS rename semantic, i.e. renaming a file atop an existing 695 * one should return false. 696 * See hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html 697 */ 698class RenameLikeHDFSFileSystem extends RawLocalFileSystem { 699 override def rename(src: Path, dst: Path): Boolean = { 700 if (exists(dst)) { 701 return false 702 } else { 703 return super.rename(src, dst) 704 } 705 } 706} 707 708/** 709 * Fake FileSystem to test that the StateStore throws an exception while committing the 710 * delta file, when `fs.rename` returns `false`. 711 */ 712class RenameReturnsFalseFileSystem extends RawLocalFileSystem { 713 import RenameReturnsFalseFileSystem._ 714 override def getUri: URI = { 715 URI.create(s"$scheme:///") 716 } 717 718 override def rename(src: Path, dst: Path): Boolean = false 719} 720 721object RenameReturnsFalseFileSystem { 722 val scheme = s"StateStoreSuite${math.abs(Random.nextInt)}fs" 723} 724