1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.streaming.state
19
20import java.io.{File, IOException}
21import java.net.URI
22
23import scala.collection.JavaConverters._
24import scala.collection.mutable
25import scala.util.Random
26
27import org.apache.commons.io.FileUtils
28import org.apache.hadoop.conf.Configuration
29import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
30import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
31import org.scalatest.concurrent.Eventually._
32import org.scalatest.time.SpanSugar._
33
34import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
35import org.apache.spark.LocalSparkContext._
36import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
37import org.apache.spark.sql.catalyst.util.quietly
38import org.apache.spark.sql.internal.SQLConf
39import org.apache.spark.sql.types._
40import org.apache.spark.unsafe.types.UTF8String
41import org.apache.spark.util.Utils
42
43class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMethodTester {
44  type MapType = mutable.HashMap[UnsafeRow, UnsafeRow]
45
46  import StateStoreCoordinatorSuite._
47  import StateStoreSuite._
48
49  private val tempDir = Utils.createTempDir().toString
50  private val keySchema = StructType(Seq(StructField("key", StringType, true)))
51  private val valueSchema = StructType(Seq(StructField("value", IntegerType, true)))
52
53  before {
54    StateStore.stop()
55    require(!StateStore.isMaintenanceRunning)
56  }
57
58  after {
59    StateStore.stop()
60    require(!StateStore.isMaintenanceRunning)
61  }
62
63  test("get, put, remove, commit, and all data iterator") {
64    val provider = newStoreProvider()
65
66    // Verify state before starting a new set of updates
67    assert(provider.latestIterator().isEmpty)
68
69    val store = provider.getStore(0)
70    assert(!store.hasCommitted)
71    intercept[IllegalStateException] {
72      store.iterator()
73    }
74    intercept[IllegalStateException] {
75      store.updates()
76    }
77
78    // Verify state after updating
79    put(store, "a", 1)
80    assert(store.numKeys() === 1)
81    intercept[IllegalStateException] {
82      store.iterator()
83    }
84    intercept[IllegalStateException] {
85      store.updates()
86    }
87    assert(provider.latestIterator().isEmpty)
88
89    // Make updates, commit and then verify state
90    put(store, "b", 2)
91    put(store, "aa", 3)
92    assert(store.numKeys() === 3)
93    remove(store, _.startsWith("a"))
94    assert(store.numKeys() === 1)
95    assert(store.commit() === 1)
96
97    assert(store.hasCommitted)
98    assert(rowsToSet(store.iterator()) === Set("b" -> 2))
99    assert(rowsToSet(provider.latestIterator()) === Set("b" -> 2))
100    assert(fileExists(provider, version = 1, isSnapshot = false))
101
102    assert(getDataFromFiles(provider) === Set("b" -> 2))
103
104    // Trying to get newer versions should fail
105    intercept[Exception] {
106      provider.getStore(2)
107    }
108    intercept[Exception] {
109      getDataFromFiles(provider, 2)
110    }
111
112    // New updates to the reloaded store with new version, and does not change old version
113    val reloadedProvider = new HDFSBackedStateStoreProvider(
114      store.id, keySchema, valueSchema, StateStoreConf.empty, new Configuration)
115    val reloadedStore = reloadedProvider.getStore(1)
116    assert(reloadedStore.numKeys() === 1)
117    put(reloadedStore, "c", 4)
118    assert(reloadedStore.numKeys() === 2)
119    assert(reloadedStore.commit() === 2)
120    assert(rowsToSet(reloadedStore.iterator()) === Set("b" -> 2, "c" -> 4))
121    assert(getDataFromFiles(provider) === Set("b" -> 2, "c" -> 4))
122    assert(getDataFromFiles(provider, version = 1) === Set("b" -> 2))
123    assert(getDataFromFiles(provider, version = 2) === Set("b" -> 2, "c" -> 4))
124  }
125
126  test("updates iterator with all combos of updates and removes") {
127    val provider = newStoreProvider()
128    var currentVersion: Int = 0
129
130    def withStore(body: StateStore => Unit): Unit = {
131      val store = provider.getStore(currentVersion)
132      body(store)
133      currentVersion += 1
134    }
135
136    // New data should be seen in updates as value added, even if they had multiple updates
137    withStore { store =>
138      put(store, "a", 1)
139      put(store, "aa", 1)
140      put(store, "aa", 2)
141      store.commit()
142      assert(updatesToSet(store.updates()) === Set(Added("a", 1), Added("aa", 2)))
143      assert(rowsToSet(store.iterator()) === Set("a" -> 1, "aa" -> 2))
144    }
145
146    // Multiple updates to same key should be collapsed in the updates as a single value update
147    // Keys that have not been updated should not appear in the updates
148    withStore { store =>
149      put(store, "a", 4)
150      put(store, "a", 6)
151      store.commit()
152      assert(updatesToSet(store.updates()) === Set(Updated("a", 6)))
153      assert(rowsToSet(store.iterator()) === Set("a" -> 6, "aa" -> 2))
154    }
155
156    // Keys added, updated and finally removed before commit should not appear in updates
157    withStore { store =>
158      put(store, "b", 4)     // Added, finally removed
159      put(store, "bb", 5)    // Added, updated, finally removed
160      put(store, "bb", 6)
161      remove(store, _.startsWith("b"))
162      store.commit()
163      assert(updatesToSet(store.updates()) === Set.empty)
164      assert(rowsToSet(store.iterator()) === Set("a" -> 6, "aa" -> 2))
165    }
166
167    // Removed data should be seen in updates as a key removed
168    // Removed, but re-added data should be seen in updates as a value update
169    withStore { store =>
170      remove(store, _.startsWith("a"))
171      put(store, "a", 10)
172      store.commit()
173      assert(updatesToSet(store.updates()) === Set(Updated("a", 10), Removed("aa")))
174      assert(rowsToSet(store.iterator()) === Set("a" -> 10))
175    }
176  }
177
178  test("cancel") {
179    val provider = newStoreProvider()
180    val store = provider.getStore(0)
181    put(store, "a", 1)
182    store.commit()
183    assert(rowsToSet(store.iterator()) === Set("a" -> 1))
184
185    // cancelUpdates should not change the data in the files
186    val store1 = provider.getStore(1)
187    put(store1, "b", 1)
188    store1.abort()
189    assert(getDataFromFiles(provider) === Set("a" -> 1))
190  }
191
192  test("getStore with unexpected versions") {
193    val provider = newStoreProvider()
194
195    intercept[IllegalArgumentException] {
196      provider.getStore(-1)
197    }
198
199    // Prepare some data in the store
200    val store = provider.getStore(0)
201    put(store, "a", 1)
202    assert(store.commit() === 1)
203    assert(rowsToSet(store.iterator()) === Set("a" -> 1))
204
205    intercept[IllegalStateException] {
206      provider.getStore(2)
207    }
208
209    // Update store version with some data
210    val store1 = provider.getStore(1)
211    put(store1, "b", 1)
212    assert(store1.commit() === 2)
213    assert(rowsToSet(store1.iterator()) === Set("a" -> 1, "b" -> 1))
214    assert(getDataFromFiles(provider) === Set("a" -> 1, "b" -> 1))
215  }
216
217  test("snapshotting") {
218    val provider = newStoreProvider(minDeltasForSnapshot = 5)
219
220    var currentVersion = 0
221    def updateVersionTo(targetVersion: Int): Unit = {
222      for (i <- currentVersion + 1 to targetVersion) {
223        val store = provider.getStore(currentVersion)
224        put(store, "a", i)
225        store.commit()
226        currentVersion += 1
227      }
228      require(currentVersion === targetVersion)
229    }
230
231    updateVersionTo(2)
232    require(getDataFromFiles(provider) === Set("a" -> 2))
233    provider.doMaintenance()               // should not generate snapshot files
234    assert(getDataFromFiles(provider) === Set("a" -> 2))
235
236    for (i <- 1 to currentVersion) {
237      assert(fileExists(provider, i, isSnapshot = false))  // all delta files present
238      assert(!fileExists(provider, i, isSnapshot = true))  // no snapshot files present
239    }
240
241    // After version 6, snapshotting should generate one snapshot file
242    updateVersionTo(6)
243    require(getDataFromFiles(provider) === Set("a" -> 6), "store not updated correctly")
244    provider.doMaintenance()       // should generate snapshot files
245
246    val snapshotVersion = (0 to 6).find(version => fileExists(provider, version, isSnapshot = true))
247    assert(snapshotVersion.nonEmpty, "snapshot file not generated")
248    deleteFilesEarlierThanVersion(provider, snapshotVersion.get)
249    assert(
250      getDataFromFiles(provider, snapshotVersion.get) === Set("a" -> snapshotVersion.get),
251      "snapshotting messed up the data of the snapshotted version")
252    assert(
253      getDataFromFiles(provider) === Set("a" -> 6),
254      "snapshotting messed up the data of the final version")
255
256    // After version 20, snapshotting should generate newer snapshot files
257    updateVersionTo(20)
258    require(getDataFromFiles(provider) === Set("a" -> 20), "store not updated correctly")
259    provider.doMaintenance()       // do snapshot
260
261    val latestSnapshotVersion = (0 to 20).filter(version =>
262      fileExists(provider, version, isSnapshot = true)).lastOption
263    assert(latestSnapshotVersion.nonEmpty, "no snapshot file found")
264    assert(latestSnapshotVersion.get > snapshotVersion.get, "newer snapshot not generated")
265
266    deleteFilesEarlierThanVersion(provider, latestSnapshotVersion.get)
267    assert(getDataFromFiles(provider) === Set("a" -> 20), "snapshotting messed up the data")
268  }
269
270  test("cleaning") {
271    val provider = newStoreProvider(minDeltasForSnapshot = 5)
272
273    for (i <- 1 to 20) {
274      val store = provider.getStore(i - 1)
275      put(store, "a", i)
276      store.commit()
277      provider.doMaintenance() // do cleanup
278    }
279    require(
280      rowsToSet(provider.latestIterator()) === Set("a" -> 20),
281      "store not updated correctly")
282
283    assert(!fileExists(provider, version = 1, isSnapshot = false)) // first file should be deleted
284
285    // last couple of versions should be retrievable
286    assert(getDataFromFiles(provider, 20) === Set("a" -> 20))
287    assert(getDataFromFiles(provider, 19) === Set("a" -> 19))
288  }
289
290  test("SPARK-19677: Committing a delta file atop an existing one should not fail on HDFS") {
291    val conf = new Configuration()
292    conf.set("fs.fake.impl", classOf[RenameLikeHDFSFileSystem].getName)
293    conf.set("fs.default.name", "fake:///")
294
295    val provider = newStoreProvider(hadoopConf = conf)
296    provider.getStore(0).commit()
297    provider.getStore(0).commit()
298
299    // Verify we don't leak temp files
300    val tempFiles = FileUtils.listFiles(new File(provider.id.checkpointLocation),
301      null, true).asScala.filter(_.getName.startsWith("temp-"))
302    assert(tempFiles.isEmpty)
303  }
304
305  test("corrupted file handling") {
306    val provider = newStoreProvider(minDeltasForSnapshot = 5)
307    for (i <- 1 to 6) {
308      val store = provider.getStore(i - 1)
309      put(store, "a", i)
310      store.commit()
311      provider.doMaintenance() // do cleanup
312    }
313    val snapshotVersion = (0 to 10).find( version =>
314      fileExists(provider, version, isSnapshot = true)).getOrElse(fail("snapshot file not found"))
315
316    // Corrupt snapshot file and verify that it throws error
317    assert(getDataFromFiles(provider, snapshotVersion) === Set("a" -> snapshotVersion))
318    corruptFile(provider, snapshotVersion, isSnapshot = true)
319    intercept[Exception] {
320      getDataFromFiles(provider, snapshotVersion)
321    }
322
323    // Corrupt delta file and verify that it throws error
324    assert(getDataFromFiles(provider, snapshotVersion - 1) === Set("a" -> (snapshotVersion - 1)))
325    corruptFile(provider, snapshotVersion - 1, isSnapshot = false)
326    intercept[Exception] {
327      getDataFromFiles(provider, snapshotVersion - 1)
328    }
329
330    // Delete delta file and verify that it throws error
331    deleteFilesEarlierThanVersion(provider, snapshotVersion)
332    intercept[Exception] {
333      getDataFromFiles(provider, snapshotVersion - 1)
334    }
335  }
336
337  test("StateStore.get") {
338    quietly {
339      val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
340      val storeId = StateStoreId(dir, 0, 0)
341      val storeConf = StateStoreConf.empty
342      val hadoopConf = new Configuration()
343
344
345      // Verify that trying to get incorrect versions throw errors
346      intercept[IllegalArgumentException] {
347        StateStore.get(storeId, keySchema, valueSchema, -1, storeConf, hadoopConf)
348      }
349      assert(!StateStore.isLoaded(storeId)) // version -1 should not attempt to load the store
350
351      intercept[IllegalStateException] {
352        StateStore.get(storeId, keySchema, valueSchema, 1, storeConf, hadoopConf)
353      }
354
355      // Increase version of the store
356      val store0 = StateStore.get(storeId, keySchema, valueSchema, 0, storeConf, hadoopConf)
357      assert(store0.version === 0)
358      put(store0, "a", 1)
359      store0.commit()
360
361      assert(StateStore.get(storeId, keySchema, valueSchema, 1, storeConf, hadoopConf).version == 1)
362      assert(StateStore.get(storeId, keySchema, valueSchema, 0, storeConf, hadoopConf).version == 0)
363
364      // Verify that you can remove the store and still reload and use it
365      StateStore.unload(storeId)
366      assert(!StateStore.isLoaded(storeId))
367
368      val store1 = StateStore.get(storeId, keySchema, valueSchema, 1, storeConf, hadoopConf)
369      assert(StateStore.isLoaded(storeId))
370      put(store1, "a", 2)
371      assert(store1.commit() === 2)
372      assert(rowsToSet(store1.iterator()) === Set("a" -> 2))
373    }
374  }
375
376  test("maintenance") {
377    val conf = new SparkConf()
378      .setMaster("local")
379      .setAppName("test")
380      // Make maintenance thread do snapshots and cleanups very fast
381      .set(StateStore.MAINTENANCE_INTERVAL_CONFIG, "10ms")
382      // Make sure that when SparkContext stops, the StateStore maintenance thread 'quickly'
383      // fails to talk to the StateStoreCoordinator and unloads all the StateStores
384      .set("spark.rpc.numRetries", "1")
385    val opId = 0
386    val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
387    val storeId = StateStoreId(dir, opId, 0)
388    val sqlConf = new SQLConf()
389    sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
390    val storeConf = StateStoreConf(sqlConf)
391    val hadoopConf = new Configuration()
392    val provider = new HDFSBackedStateStoreProvider(
393      storeId, keySchema, valueSchema, storeConf, hadoopConf)
394
395    var latestStoreVersion = 0
396
397    def generateStoreVersions() {
398      for (i <- 1 to 20) {
399        val store = StateStore.get(
400          storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf)
401        put(store, "a", i)
402        store.commit()
403        latestStoreVersion += 1
404      }
405    }
406
407    val timeoutDuration = 60 seconds
408
409    quietly {
410      withSpark(new SparkContext(conf)) { sc =>
411        withCoordinatorRef(sc) { coordinatorRef =>
412          require(!StateStore.isMaintenanceRunning, "StateStore is unexpectedly running")
413
414          // Generate sufficient versions of store for snapshots
415          generateStoreVersions()
416
417          eventually(timeout(timeoutDuration)) {
418            // Store should have been reported to the coordinator
419            assert(coordinatorRef.getLocation(storeId).nonEmpty, "active instance was not reported")
420
421            // Background maintenance should clean up and generate snapshots
422            assert(StateStore.isMaintenanceRunning, "Maintenance task is not running")
423
424            // Some snapshots should have been generated
425            val snapshotVersions = (1 to latestStoreVersion).filter { version =>
426              fileExists(provider, version, isSnapshot = true)
427            }
428            assert(snapshotVersions.nonEmpty, "no snapshot file found")
429          }
430
431          // Generate more versions such that there is another snapshot and
432          // the earliest delta file will be cleaned up
433          generateStoreVersions()
434
435          // Earliest delta file should get cleaned up
436          eventually(timeout(timeoutDuration)) {
437            assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted")
438          }
439
440          // If driver decides to deactivate all instances of the store, then this instance
441          // should be unloaded
442          coordinatorRef.deactivateInstances(dir)
443          eventually(timeout(timeoutDuration)) {
444            assert(!StateStore.isLoaded(storeId))
445          }
446
447          // Reload the store and verify
448          StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf)
449          assert(StateStore.isLoaded(storeId))
450
451          // If some other executor loads the store, then this instance should be unloaded
452          coordinatorRef.reportActiveInstance(storeId, "other-host", "other-exec")
453          eventually(timeout(timeoutDuration)) {
454            assert(!StateStore.isLoaded(storeId))
455          }
456
457          // Reload the store and verify
458          StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf)
459          assert(StateStore.isLoaded(storeId))
460        }
461      }
462
463      // Verify if instance is unloaded if SparkContext is stopped
464      eventually(timeout(timeoutDuration)) {
465        require(SparkEnv.get === null)
466        assert(!StateStore.isLoaded(storeId))
467        assert(!StateStore.isMaintenanceRunning)
468      }
469    }
470  }
471
472  test("SPARK-18342: commit fails when rename fails") {
473    import RenameReturnsFalseFileSystem._
474    val dir = scheme + "://" + Utils.createDirectory(tempDir, Random.nextString(5)).toString
475    val conf = new Configuration()
476    conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName)
477    val provider = newStoreProvider(dir = dir, hadoopConf = conf)
478    val store = provider.getStore(0)
479    put(store, "a", 0)
480    val e = intercept[IllegalStateException](store.commit())
481    assert(e.getCause.getMessage.contains("Failed to rename"))
482  }
483
484  test("SPARK-18416: do not create temp delta file until the store is updated") {
485    val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
486    val storeId = StateStoreId(dir, 0, 0)
487    val storeConf = StateStoreConf.empty
488    val hadoopConf = new Configuration()
489    val deltaFileDir = new File(s"$dir/0/0/")
490
491    def numTempFiles: Int = {
492      if (deltaFileDir.exists) {
493        deltaFileDir.listFiles.map(_.getName).count(n => n.contains("temp") && !n.startsWith("."))
494      } else 0
495    }
496
497    def numDeltaFiles: Int = {
498      if (deltaFileDir.exists) {
499        deltaFileDir.listFiles.map(_.getName).count(n => n.contains(".delta") && !n.startsWith("."))
500      } else 0
501    }
502
503    def shouldNotCreateTempFile[T](body: => T): T = {
504      val before = numTempFiles
505      val result = body
506      assert(numTempFiles === before)
507      result
508    }
509
510    // Getting the store should not create temp file
511    val store0 = shouldNotCreateTempFile {
512      StateStore.get(storeId, keySchema, valueSchema, 0, storeConf, hadoopConf)
513    }
514
515    // Put should create a temp file
516    put(store0, "a", 1)
517    assert(numTempFiles === 1)
518    assert(numDeltaFiles === 0)
519
520    // Commit should remove temp file and create a delta file
521    store0.commit()
522    assert(numTempFiles === 0)
523    assert(numDeltaFiles === 1)
524
525    // Remove should create a temp file
526    val store1 = shouldNotCreateTempFile {
527      StateStore.get(storeId, keySchema, valueSchema, 1, storeConf, hadoopConf)
528    }
529    remove(store1, _ == "a")
530    assert(numTempFiles === 1)
531    assert(numDeltaFiles === 1)
532
533    // Commit should remove temp file and create a delta file
534    store1.commit()
535    assert(numTempFiles === 0)
536    assert(numDeltaFiles === 2)
537
538    // Commit without any updates should create a delta file
539    val store2 = shouldNotCreateTempFile {
540      StateStore.get(storeId, keySchema, valueSchema, 2, storeConf, hadoopConf)
541    }
542    store2.commit()
543    assert(numTempFiles === 0)
544    assert(numDeltaFiles === 3)
545  }
546
547  def getDataFromFiles(
548      provider: HDFSBackedStateStoreProvider,
549    version: Int = -1): Set[(String, Int)] = {
550    val reloadedProvider = new HDFSBackedStateStoreProvider(
551      provider.id, keySchema, valueSchema, StateStoreConf.empty, new Configuration)
552    if (version < 0) {
553      reloadedProvider.latestIterator().map(rowsToStringInt).toSet
554    } else {
555      reloadedProvider.iterator(version).map(rowsToStringInt).toSet
556    }
557  }
558
559  def assertMap(
560      testMapOption: Option[MapType],
561      expectedMap: Map[String, Int]): Unit = {
562    assert(testMapOption.nonEmpty, "no map present")
563    val convertedMap = testMapOption.get.map(rowsToStringInt)
564    assert(convertedMap === expectedMap)
565  }
566
567  def fileExists(
568      provider: HDFSBackedStateStoreProvider,
569      version: Long,
570      isSnapshot: Boolean): Boolean = {
571    val method = PrivateMethod[Path]('baseDir)
572    val basePath = provider invokePrivate method()
573    val fileName = if (isSnapshot) s"$version.snapshot" else s"$version.delta"
574    val filePath = new File(basePath.toString, fileName)
575    filePath.exists
576  }
577
578  def deleteFilesEarlierThanVersion(provider: HDFSBackedStateStoreProvider, version: Long): Unit = {
579    val method = PrivateMethod[Path]('baseDir)
580    val basePath = provider invokePrivate method()
581    for (version <- 0 until version.toInt) {
582      for (isSnapshot <- Seq(false, true)) {
583        val fileName = if (isSnapshot) s"$version.snapshot" else s"$version.delta"
584        val filePath = new File(basePath.toString, fileName)
585        if (filePath.exists) filePath.delete()
586      }
587    }
588  }
589
590  def corruptFile(
591    provider: HDFSBackedStateStoreProvider,
592    version: Long,
593    isSnapshot: Boolean): Unit = {
594    val method = PrivateMethod[Path]('baseDir)
595    val basePath = provider invokePrivate method()
596    val fileName = if (isSnapshot) s"$version.snapshot" else s"$version.delta"
597    val filePath = new File(basePath.toString, fileName)
598    filePath.delete()
599    filePath.createNewFile()
600  }
601
602  def storeLoaded(storeId: StateStoreId): Boolean = {
603    val method = PrivateMethod[mutable.HashMap[StateStoreId, StateStore]]('loadedStores)
604    val loadedStores = StateStore invokePrivate method()
605    loadedStores.contains(storeId)
606  }
607
608  def unloadStore(storeId: StateStoreId): Boolean = {
609    val method = PrivateMethod('remove)
610    StateStore invokePrivate method(storeId)
611  }
612
613  def newStoreProvider(
614      opId: Long = Random.nextLong,
615      partition: Int = 0,
616      minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
617      dir: String = Utils.createDirectory(tempDir, Random.nextString(5)).toString,
618      hadoopConf: Configuration = new Configuration()
619    ): HDFSBackedStateStoreProvider = {
620    val sqlConf = new SQLConf()
621    sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot)
622    sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
623    new HDFSBackedStateStoreProvider(
624      StateStoreId(dir, opId, partition),
625      keySchema,
626      valueSchema,
627      new StateStoreConf(sqlConf),
628      hadoopConf)
629  }
630
631  def remove(store: StateStore, condition: String => Boolean): Unit = {
632    store.remove(row => condition(rowToString(row)))
633  }
634
635  private def put(store: StateStore, key: String, value: Int): Unit = {
636    store.put(stringToRow(key), intToRow(value))
637  }
638
639  private def get(store: StateStore, key: String): Option[Int] = {
640    store.get(stringToRow(key)).map(rowToInt)
641  }
642}
643
644private[state] object StateStoreSuite {
645
646  /** Trait and classes mirroring [[StoreUpdate]] for testing store updates iterator */
647  trait TestUpdate
648  case class Added(key: String, value: Int) extends TestUpdate
649  case class Updated(key: String, value: Int) extends TestUpdate
650  case class Removed(key: String) extends TestUpdate
651
652  val strProj = UnsafeProjection.create(Array[DataType](StringType))
653  val intProj = UnsafeProjection.create(Array[DataType](IntegerType))
654
655  def stringToRow(s: String): UnsafeRow = {
656    strProj.apply(new GenericInternalRow(Array[Any](UTF8String.fromString(s)))).copy()
657  }
658
659  def intToRow(i: Int): UnsafeRow = {
660    intProj.apply(new GenericInternalRow(Array[Any](i))).copy()
661  }
662
663  def rowToString(row: UnsafeRow): String = {
664    row.getUTF8String(0).toString
665  }
666
667  def rowToInt(row: UnsafeRow): Int = {
668    row.getInt(0)
669  }
670
671  def rowsToIntInt(row: (UnsafeRow, UnsafeRow)): (Int, Int) = {
672    (rowToInt(row._1), rowToInt(row._2))
673  }
674
675
676  def rowsToStringInt(row: (UnsafeRow, UnsafeRow)): (String, Int) = {
677    (rowToString(row._1), rowToInt(row._2))
678  }
679
680  def rowsToSet(iterator: Iterator[(UnsafeRow, UnsafeRow)]): Set[(String, Int)] = {
681    iterator.map(rowsToStringInt).toSet
682  }
683
684  def updatesToSet(iterator: Iterator[StoreUpdate]): Set[TestUpdate] = {
685    iterator.map {
686      case ValueAdded(key, value) => Added(rowToString(key), rowToInt(value))
687      case ValueUpdated(key, value) => Updated(rowToString(key), rowToInt(value))
688      case ValueRemoved(key, _) => Removed(rowToString(key))
689    }.toSet
690  }
691}
692
693/**
694 * Fake FileSystem that simulates HDFS rename semantic, i.e. renaming a file atop an existing
695 * one should return false.
696 * See hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html
697 */
698class RenameLikeHDFSFileSystem extends RawLocalFileSystem {
699  override def rename(src: Path, dst: Path): Boolean = {
700    if (exists(dst)) {
701      return false
702    } else {
703      return super.rename(src, dst)
704    }
705  }
706}
707
708/**
709 * Fake FileSystem to test that the StateStore throws an exception while committing the
710 * delta file, when `fs.rename` returns `false`.
711 */
712class RenameReturnsFalseFileSystem extends RawLocalFileSystem {
713  import RenameReturnsFalseFileSystem._
714  override def getUri: URI = {
715    URI.create(s"$scheme:///")
716  }
717
718  override def rename(src: Path, dst: Path): Boolean = false
719}
720
721object RenameReturnsFalseFileSystem {
722  val scheme = s"StateStoreSuite${math.abs(Random.nextInt)}fs"
723}
724