1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 package org.rocksdb; 6 7 import org.junit.After; 8 import org.junit.Before; 9 import org.junit.Rule; 10 import org.junit.Test; 11 import org.junit.rules.TemporaryFolder; 12 import org.junit.runner.RunWith; 13 import org.junit.runners.Parameterized; 14 import org.junit.runners.Parameterized.Parameter; 15 import org.junit.runners.Parameterized.Parameters; 16 17 import java.nio.ByteBuffer; 18 import java.util.*; 19 import java.util.concurrent.*; 20 21 @RunWith(Parameterized.class) 22 public class WriteBatchThreadedTest { 23 24 @Parameters(name = "WriteBatchThreadedTest(threadCount={0})") data()25 public static Iterable<Integer> data() { 26 return Arrays.asList(new Integer[]{1, 10, 50, 100}); 27 } 28 29 @Parameter 30 public int threadCount; 31 32 @Rule 33 public TemporaryFolder dbFolder = new TemporaryFolder(); 34 35 RocksDB db; 36 37 @Before setUp()38 public void setUp() throws Exception { 39 RocksDB.loadLibrary(); 40 final Options options = new Options() 41 .setCreateIfMissing(true) 42 .setIncreaseParallelism(32); 43 db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); 44 assert (db != null); 45 } 46 47 @After tearDown()48 public void tearDown() throws Exception { 49 if (db != null) { 50 db.close(); 51 } 52 } 53 54 @Test threadedWrites()55 public void threadedWrites() throws InterruptedException, ExecutionException { 56 final List<Callable<Void>> callables = new ArrayList<>(); 57 for (int i = 0; i < 100; i++) { 58 final int offset = i * 100; 59 callables.add(new Callable<Void>() { 60 @Override 61 public Void call() throws RocksDBException { 62 try (final WriteBatch wb = new WriteBatch(); 63 final WriteOptions w_opt = new WriteOptions()) { 64 for (int i = offset; i < offset + 100; i++) { 65 wb.put(ByteBuffer.allocate(4).putInt(i).array(), "parallel rocks test".getBytes()); 66 } 67 db.write(w_opt, wb); 68 } 69 return null; 70 } 71 }); 72 } 73 74 //submit the callables 75 final ExecutorService executorService = 76 Executors.newFixedThreadPool(threadCount); 77 try { 78 final ExecutorCompletionService<Void> completionService = 79 new ExecutorCompletionService<>(executorService); 80 final Set<Future<Void>> futures = new HashSet<>(); 81 for (final Callable<Void> callable : callables) { 82 futures.add(completionService.submit(callable)); 83 } 84 85 while (futures.size() > 0) { 86 final Future<Void> future = completionService.take(); 87 futures.remove(future); 88 89 try { 90 future.get(); 91 } catch (final ExecutionException e) { 92 for (final Future<Void> f : futures) { 93 f.cancel(true); 94 } 95 96 throw e; 97 } 98 } 99 } finally { 100 executorService.shutdown(); 101 executorService.awaitTermination(10, TimeUnit.SECONDS); 102 } 103 } 104 } 105