1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 package org.rocksdb;
6 
7 import org.junit.After;
8 import org.junit.Before;
9 import org.junit.Rule;
10 import org.junit.Test;
11 import org.junit.rules.TemporaryFolder;
12 import org.junit.runner.RunWith;
13 import org.junit.runners.Parameterized;
14 import org.junit.runners.Parameterized.Parameter;
15 import org.junit.runners.Parameterized.Parameters;
16 
17 import java.nio.ByteBuffer;
18 import java.util.*;
19 import java.util.concurrent.*;
20 
21 @RunWith(Parameterized.class)
22 public class WriteBatchThreadedTest {
23 
24   @Parameters(name = "WriteBatchThreadedTest(threadCount={0})")
data()25   public static Iterable<Integer> data() {
26     return Arrays.asList(new Integer[]{1, 10, 50, 100});
27   }
28 
29   @Parameter
30   public int threadCount;
31 
32   @Rule
33   public TemporaryFolder dbFolder = new TemporaryFolder();
34 
35   RocksDB db;
36 
37   @Before
setUp()38   public void setUp() throws Exception {
39     RocksDB.loadLibrary();
40     final Options options = new Options()
41         .setCreateIfMissing(true)
42         .setIncreaseParallelism(32);
43     db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
44     assert (db != null);
45   }
46 
47   @After
tearDown()48   public void tearDown() throws Exception {
49     if (db != null) {
50       db.close();
51     }
52   }
53 
54   @Test
threadedWrites()55   public void threadedWrites() throws InterruptedException, ExecutionException {
56     final List<Callable<Void>> callables = new ArrayList<>();
57     for (int i = 0; i < 100; i++) {
58       final int offset = i * 100;
59       callables.add(new Callable<Void>() {
60         @Override
61         public Void call() throws RocksDBException {
62           try (final WriteBatch wb = new WriteBatch();
63                final WriteOptions w_opt = new WriteOptions()) {
64             for (int i = offset; i < offset + 100; i++) {
65               wb.put(ByteBuffer.allocate(4).putInt(i).array(), "parallel rocks test".getBytes());
66             }
67             db.write(w_opt, wb);
68           }
69           return null;
70         }
71       });
72     }
73 
74     //submit the callables
75     final ExecutorService executorService =
76         Executors.newFixedThreadPool(threadCount);
77     try {
78       final ExecutorCompletionService<Void> completionService =
79           new ExecutorCompletionService<>(executorService);
80       final Set<Future<Void>> futures = new HashSet<>();
81       for (final Callable<Void> callable : callables) {
82         futures.add(completionService.submit(callable));
83       }
84 
85       while (futures.size() > 0) {
86         final Future<Void> future = completionService.take();
87         futures.remove(future);
88 
89         try {
90           future.get();
91         } catch (final ExecutionException e) {
92           for (final Future<Void> f : futures) {
93             f.cancel(true);
94           }
95 
96           throw e;
97         }
98       }
99     } finally {
100       executorService.shutdown();
101       executorService.awaitTermination(10, TimeUnit.SECONDS);
102     }
103   }
104 }
105